Address k8s plugin code smells reported by sonar
[dcaegen2/platform/plugins.git] / k8s / k8splugin / tasks.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19 #
20 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21
22 # Lifecycle interface calls for containerized components
23
24 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
25 from . import cloudify_importer
26
27 import time, copy
28 import json
29 from cloudify import ctx
30 from cloudify.decorators import operation
31 from cloudify.exceptions import NonRecoverableError, RecoverableError
32 from onap_dcae_dcaepolicy_lib import Policies
33 from k8splugin import discovery as dis
34 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
35     merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
36 from k8splugin.exceptions import DockerPluginDeploymentError
37 from k8splugin import utils
38 from configure import configure
39 import k8sclient
40
41 # Get configuration
42 plugin_conf = configure.configure()
43 CONSUL_HOST = plugin_conf.get("consul_host")
44 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
45 DCAE_NAMESPACE = plugin_conf.get("namespace")
46 DEFAULT_MAX_WAIT = plugin_conf.get("max_wait", 1800)
47 DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location")
48 COMPONENT_CA_CERT_PATH = plugin_conf.get("tls").get("component_ca_cert_path")
49 CBS_BASE_URL = plugin_conf.get("cbs").get("base_url")
50
51 # Used to construct delivery urls for data router subscribers. Data router in FTL
52 # requires https but this author believes that ONAP is to be defaulted to http.
53 DEFAULT_SCHEME = "http"
54
55 # Property keys
56 SERVICE_COMPONENT_NAME = "service_component_name"
57 CONTAINER_ID = "container_id"
58 APPLICATION_CONFIG = "application_config"
59 K8S_DEPLOYMENT = "k8s_deployment"
60 RESOURCE_KW = "resource_config"
61 LOCATION_ID = "location_id"
62
63 # Utility methods
64
65 # Lifecycle interface calls for dcae.nodes.DockerContainer
66
67 def _setup_for_discovery(**kwargs):
68     """Setup for config discovery"""
69     try:
70         name = kwargs['name']
71         application_config = kwargs[APPLICATION_CONFIG]
72
73         # NOTE: application_config is no longer a json string and is inputed as a
74         # YAML map which translates to a dict. We don't have to do any
75         # preprocessing anymore.
76         conn = dis.create_kv_conn(CONSUL_HOST)
77         dis.push_service_component_config(conn, name, application_config)
78         return kwargs
79     except dis.DiscoveryConnectionError as e:
80         raise RecoverableError(e)
81     except Exception as e:
82         ctx.logger.error("Unexpected error while pushing configuration: {0}"
83                 .format(str(e)))
84         raise NonRecoverableError(e)
85
86 def _generate_component_name(**kwargs):
87     """Generate component name"""
88     service_component_type = kwargs['service_component_type']
89     name_override = kwargs['service_component_name_override']
90
91     kwargs['name'] = name_override if name_override \
92             else dis.generate_service_component_name(service_component_type)
93     return kwargs
94
95 def _done_for_create(**kwargs):
96     """Wrap up create operation"""
97     name = kwargs['name']
98     kwargs[SERVICE_COMPONENT_NAME] = name
99     # All updates to the runtime_properties happens here. I don't see a reason
100     # why we shouldn't do this because the context is not being mutated by
101     # something else and will keep the other functions pure (pure in the sense
102     # not dealing with CloudifyContext).
103     ctx.instance.runtime_properties.update(kwargs)
104     ctx.logger.info("Done setting up: {0}".format(name))
105     return kwargs
106
107 def _get_resources(**kwargs):
108     if kwargs is not None:
109         ctx.logger.debug("{0}: {1}".format(RESOURCE_KW, kwargs.get(RESOURCE_KW)))
110         return kwargs.get(RESOURCE_KW)
111     ctx.logger.info("set resources to None")
112     return None
113
114 def  _get_location():
115     ''' Get the k8s location property.  Set to the default if the property is missing, None, or zero-length '''
116     return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \
117         else DEFAULT_K8S_LOCATION
118
119 @merge_inputs_for_create
120 @monkeypatch_loggers
121 @Policies.gather_policies_to_node()
122 @operation
123 def create_for_components(**create_inputs):
124     """Create step for service components
125
126     This interface is responsible for:
127
128     1. Generating service component name
129     2. Populating config information into Consul
130     """
131     _done_for_create(
132             **_setup_for_discovery(
133                 **_enhance_docker_params(
134                     **_generate_component_name(
135                         **create_inputs))))
136
137
138 def _parse_streams(**kwargs):
139     """Parse streams and setup for DMaaP plugin"""
140     # The DMaaP plugin requires this plugin to set the runtime properties
141     # keyed by the node name.
142     for stream in kwargs["streams_publishes"]:
143         kwargs[stream["name"]] = stream
144
145     for stream in kwargs["streams_subscribes"]:
146         if stream["type"] == "data_router":
147
148             # Don't want to mutate the source
149             stream = copy.deepcopy(stream)
150
151             # Set up the delivery URL
152             # Using service_component_name as the host name in the subscriber URL
153             # will work in a single-cluster ONAP deployment.  Whether it will also work
154             # in a multi-cluster ONAP deployment--with a central location and one or
155             # more remote ("edge") locations depends on how networking and DNS is set
156             # up in a multi-cluster deployment
157             service_component_name = kwargs["name"]
158             ports, _ = k8sclient.parse_ports(kwargs["ports"])
159             dport, _ = ports[0]
160             subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
161
162             scheme = stream.get("scheme", DEFAULT_SCHEME)
163             if "route" not in stream:
164                 raise NonRecoverableError("'route' key missing from data router subscriber")
165             path = stream["route"]
166             stream["delivery_url"] = "{scheme}://{host}/{path}".format(
167                     scheme=scheme, host=subscriber_host, path=path)
168
169             # If username and password has not been provided then generate it. The
170             # DMaaP plugin doesn't generate for subscribers. The generation code
171             # and length of username password has been lifted from the DMaaP
172             # plugin.
173             if not stream.get("username", None):
174                 stream["username"] = utils.random_string(8)
175             if not stream.get("password", None):
176                 stream["password"] = utils.random_string(10)
177
178         kwargs[stream["name"]] = stream
179
180     return kwargs
181
182 @merge_inputs_for_create
183 @monkeypatch_loggers
184 @Policies.gather_policies_to_node()
185 @operation
186 def create_for_components_with_streams(**create_inputs):
187     """Create step for service components that use DMaaP
188
189     This interface is responsible for:
190
191     1. Generating service component name
192     2. Setup runtime properties for DMaaP plugin
193     3. Populating application config into Consul
194     """
195     _done_for_create(
196             **_setup_for_discovery(
197                 **_parse_streams(
198                     **_enhance_docker_params(
199                         **_generate_component_name(
200                             **create_inputs)))))
201
202 @merge_inputs_for_create
203 @monkeypatch_loggers
204 @operation
205 def create_for_platforms(**create_inputs):
206     """Create step for platform components
207
208     This interface is responible for:
209
210     1. Populating config information into Consul
211     """
212     _done_for_create(
213             **_setup_for_discovery(
214                 **create_inputs))
215
216 def _verify_k8s_deployment(location, service_component_name, max_wait):
217     """Verify that the k8s Deployment is ready
218
219     Args:
220     -----
221     location (string): location of the k8s cluster where the component was deployed
222     service_component_name: component's service component name
223     max_wait (integer): limit to how may attempts to make which translates to
224         seconds because each sleep is one second. 0 means infinite.
225
226     Return:
227     -------
228     True if deployment is ready within the maximum wait time, False otherwise
229     """
230     num_attempts = 1
231
232     while True:
233         if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name):
234             return True
235         else:
236             num_attempts += 1
237
238             if max_wait > 0 and max_wait < num_attempts:
239                 return False
240
241             time.sleep(1)
242
243     return True
244
245 def _create_and_start_container(container_name, image, **kwargs):
246     '''
247     This will create a k8s Deployment and, if needed, a k8s Service or two.
248     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
249     We're not exposing k8s to the component developer and the blueprint author.
250     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
251     the details from the component developer and the blueprint author.)
252
253     kwargs may have:
254         - volumes:  array of volume objects, where a volume object is:
255             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
256         - ports: array of strings in the form "container_port:host_port"
257         - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
258         - always_pull: boolean.  If true, sets image pull policy to "Always"
259           so that a fresh copy of the image is always pull.  Otherwise, sets
260           image pull policy to "IfNotPresent"
261         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
262         - log_info: an object with info for setting up ELK logging, with the form:
263             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
264         - tls_info: an object with information for setting up the component to act as a TLS server, with the form:
265             {"use_tls" : true_or_false, "cert_directory": "/path/to/directory_where_certs_should_be_placed" }
266         - replicas: number of replicas to be launched initially
267         - readiness: object with information needed to create a readiness check
268         - liveness: object with information needed to create a liveness check
269         - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
270     '''
271     tls_info = kwargs.get("tls_info")
272     env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
273             "CONFIG_BINDING_SERVICE": "config-binding-service",
274             "DCAE_CA_CERTPATH" : "{0}/cacert.pem".format(tls_info["cert_directory"]) if (tls_info and tls_info["cert_directory"]) else COMPONENT_CA_CERT_PATH,
275             "CBS_CONFIG_URL" : "{0}/{1}".format(CBS_BASE_URL, container_name)
276           }
277     env.update(kwargs.get("envs", {}))
278     ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
279     ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
280     replicas = kwargs.get("replicas", 1)
281     resource_config = _get_resources(**kwargs)
282     _, dep = k8sclient.deploy(DCAE_NAMESPACE,
283                      container_name,
284                      image,
285                      replicas=replicas,
286                      always_pull=kwargs.get("always_pull_image", False),
287                      k8sconfig=plugin_conf,
288                      resources=resource_config,
289                      volumes=kwargs.get("volumes", []),
290                      ports=kwargs.get("ports", []),
291                      msb_list=kwargs.get("msb_list"),
292                      tls_info=kwargs.get("tls_info"),
293                      env=env,
294                      labels=kwargs.get("labels", {}),
295                      log_info=kwargs.get("log_info"),
296                      readiness=kwargs.get("readiness"),
297                      liveness=kwargs.get("liveness"),
298                      k8s_location=kwargs.get("k8s_location"))
299
300     # Capture the result of deployment for future use
301     ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
302     kwargs[K8S_DEPLOYMENT] = dep
303     ctx.instance.runtime_properties["replicas"] = replicas
304     ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
305     return kwargs
306
307 def _parse_cloudify_context(**kwargs):
308     """Parse Cloudify context
309
310     Extract what is needed. This is impure function because it requires ctx.
311     """
312     kwargs["deployment_id"] = ctx.deployment.id
313
314     # Set some labels for the Kubernetes pods
315     # The name segment is required and must be 63 characters or less
316     kwargs["labels"] = {
317         "cfydeployment" : ctx.deployment.id,
318         "cfynode": ctx.node.name[:63],
319         "cfynodeinstance": ctx.instance.id[:63]
320     }
321
322     # Pick up the centralized logging info
323     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
324         kwargs["log_info"] = ctx.node.properties["log_info"]
325
326     # Pick up TLS info if present
327     if "tls_info" in ctx.node.properties:
328         kwargs["tls_info"] = ctx.node.properties["tls_info"]
329
330     # Pick up replica count and always_pull_image flag
331     if "replicas" in ctx.node.properties:
332         kwargs["replicas"] = ctx.node.properties["replicas"]
333     if "always_pull_image" in ctx.node.properties:
334         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
335
336     # Pick up location
337     kwargs["k8s_location"] = _get_location()
338
339     return kwargs
340
341 def _enhance_docker_params(**kwargs):
342     '''
343     Set up Docker environment variables and readiness/liveness check info
344     and inject into kwargs.
345     '''
346
347     # Get info for setting up readiness/liveness probe, if present
348     docker_config = kwargs.get("docker_config", {})
349     if "healthcheck" in docker_config:
350         kwargs["readiness"] = docker_config["healthcheck"]
351     if "livehealthcheck" in docker_config:
352         kwargs["liveness"] = docker_config["livehealthcheck"]
353
354     envs = kwargs.get("envs", {})
355
356     # Set tags on this component for its Consul registration as a service
357     tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
358     tags = [ str(tag) for tag in tags if tag is not None ]
359     # Registrator will use this to register this component with tags. Must be
360     # comma delimited.
361     envs["SERVICE_TAGS"] = ",".join(tags)
362
363     kwargs["envs"] = envs
364
365     def combine_params(key, docker_config, kwargs):
366         v = docker_config.get(key, []) + kwargs.get(key, [])
367         kwargs[key] = v
368         return kwargs
369
370     # Add the lists of ports and volumes unintelligently - meaning just add the
371     # lists together with no deduping.
372     kwargs = combine_params("ports", docker_config, kwargs)
373     kwargs = combine_params("volumes", docker_config, kwargs)
374
375
376     return kwargs
377
378 def _create_and_start_component(**kwargs):
379     """Create and start component (container)"""
380     image = kwargs["image"]
381     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
382     # Need to be picky and manually select out pieces because just using kwargs
383     # which contains everything confused the execution of
384     # _create_and_start_container because duplicate variables exist
385     sub_kwargs = {
386         "volumes": kwargs.get("volumes", []),
387         "ports": kwargs.get("ports", None),
388         "envs": kwargs.get("envs", {}),
389         "log_info": kwargs.get("log_info", {}),
390         "tls_info": kwargs.get("tls_info", {}),
391         "labels": kwargs.get("labels", {}),
392         "resource_config": kwargs.get("resource_config",{}),
393         "readiness": kwargs.get("readiness",{}),
394         "liveness": kwargs.get("liveness",{}),
395         "k8s_location": kwargs.get("k8s_location")}
396     returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs)
397     kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT]
398
399     return kwargs
400
401 def _verify_component(**kwargs):
402     """Verify deployment is ready"""
403     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
404
405     max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
406     ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
407
408     if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait):
409         ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
410     else:
411         # The component did not become ready within the "max_wait" interval.
412         # Delete the k8s components created already and remove configuration from Consul.
413         ctx.logger.error("k8s deployment never became ready for {0}".format(service_component_name))
414         if (K8S_DEPLOYMENT in kwargs) and (len(kwargs[K8S_DEPLOYMENT]["deployment"]) > 0):
415             ctx.logger.info("attempting to delete k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
416             k8sclient.undeploy(kwargs[K8S_DEPLOYMENT])
417             ctx.logger.info("deleted k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
418         cleanup_discovery(**kwargs)
419         raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
420
421     return kwargs
422
423 def _done_for_start(**kwargs):
424     ctx.instance.runtime_properties.update(kwargs)
425     ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
426     return kwargs
427
428 def _setup_msb_registration(service_name, msb_reg):
429     return {
430         "serviceName" : service_name,
431         "port" : msb_reg.get("port", "80"),
432         "version" : msb_reg.get("version", "v1"),
433         "url" : msb_reg.get("url_path", "/v1"),
434         "protocol" : "REST",
435         "enable_ssl" : msb_reg.get("uses_ssl", False),
436         "visualRange" : "1"
437 }
438
439 @wrap_error_handling_start
440 @merge_inputs_for_start
441 @monkeypatch_loggers
442 @operation
443 def create_and_start_container_for_components(**start_inputs):
444     """Initiate Kubernetes deployment for service components
445
446     This operation method is to be used with the ContainerizedServiceComponent
447     node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
448     that the app is up and responding successfully to readiness probes.
449     """
450     _done_for_start(
451             **_verify_component(
452                 **_create_and_start_component(
453                     **_parse_cloudify_context(**start_inputs))))
454
455 @wrap_error_handling_start
456 @monkeypatch_loggers
457 @operation
458 def create_and_start_container_for_platforms(**kwargs):
459     """Initiate Kubernetes deployment for platform components
460
461     This operation method is to be used with the ContainerizedPlatformComponent
462     node type.
463     """
464     # Capture node properties
465     image = ctx.node.properties["image"]
466     docker_config = ctx.node.properties.get("docker_config", {})
467     resource_config = ctx.node.properties.get("resource_config", {})
468     kwargs["resource_config"] = resource_config
469     if "healthcheck" in docker_config:
470         kwargs["readiness"] = docker_config["healthcheck"]
471     if "livehealthcheck" in docker_config:
472         kwargs["liveness"] = docker_config["livehealthcheck"]
473     if "dns_name" in ctx.node.properties:
474         service_component_name = ctx.node.properties["dns_name"]
475     else:
476         service_component_name = ctx.node.properties["name"]
477
478     # Set some labels for the Kubernetes pods
479     # The name segment is required and must be 63 characters or less
480     kwargs["labels"] = {
481         "cfydeployment" : ctx.deployment.id,
482         "cfynode": ctx.node.name[:63],
483         "cfynodeinstance": ctx.instance.id[:63]
484     }
485
486     host_port = ctx.node.properties["host_port"]
487     container_port = ctx.node.properties["container_port"]
488
489     # Cloudify properties are all required and Cloudify complains that None
490     # is not a valid type for integer. Defaulting to 0 to indicate to not
491     # use this and not to set a specific port mapping in cases like service
492     # change handler.
493     if container_port != 0:
494         # Doing this because other nodes might want to use this property
495         port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
496         ports = kwargs.get("ports", []) + [ port_mapping ]
497         kwargs["ports"] = ports
498     if "ports" not in kwargs:
499         ctx.logger.warn("No port mappings defined. Will randomly assign port.")
500
501     # All of the new node properties could be handled more DRYly!
502     # If a registration to MSB is required, then set up the registration info
503     if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
504         kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
505
506     # If centralized logging via ELK is desired, then set up the logging info
507     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
508         kwargs["log_info"] = ctx.node.properties["log_info"]
509
510     # Pick up TLS info if present
511     if "tls_info" in ctx.node.properties:
512         kwargs["tls_info"] = ctx.node.properties["tls_info"]
513
514     # Pick up replica count and always_pull_image flag
515     if "replicas" in ctx.node.properties:
516         kwargs["replicas"] = ctx.node.properties["replicas"]
517     if "always_pull_image" in ctx.node.properties:
518         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
519
520     # Pick up location
521     kwargs["k8s_location"] = _get_location()
522
523     returned_args = _create_and_start_container(service_component_name, image, **kwargs)
524
525     # Verify that the k8s deployment is ready
526     #   - Set service component name into kwargs
527     #   - max_wait is already in kwargs if it was set
528     returned_args[SERVICE_COMPONENT_NAME] = service_component_name
529     _verify_component(**returned_args)
530
531 @wrap_error_handling_start
532 @monkeypatch_loggers
533 @operation
534 def create_and_start_container(**kwargs):
535     """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
536     service_component_name = ctx.node.properties["name"]
537     ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
538
539     image = ctx.node.properties["image"]
540     kwargs["k8s_location"] = _get_location()
541
542     _create_and_start_container(service_component_name, image,**kwargs)
543
544 @monkeypatch_loggers
545 @operation
546 def stop_and_remove_container(**kwargs):
547     """Delete Kubernetes deployment"""
548     if K8S_DEPLOYMENT in ctx.instance.runtime_properties:
549         try:
550             deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
551             k8sclient.undeploy(deployment_description)
552
553         except Exception as e:
554             ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
555                     .format(str(e)))
556     else:
557         # A previous install workflow may have failed,
558         # and no Kubernetes deployment info was recorded in runtime_properties.
559         # No need to run the undeploy operation
560         ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment")
561
562 @wrap_error_handling_update
563 @monkeypatch_loggers
564 @operation
565 def scale(replicas, **kwargs):
566     """Change number of replicas in the deployment"""
567     service_component_name = ctx.instance.runtime_properties["service_component_name"]
568
569     if replicas > 0:
570         current_replicas = ctx.instance.runtime_properties["replicas"]
571         ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
572         deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
573         k8sclient.scale(deployment_description, replicas)
574         ctx.instance.runtime_properties["replicas"] = replicas
575
576         # Verify that the scaling took place as expected
577         max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
578         ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
579         if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
580             ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
581
582     else:
583         ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
584
585 @wrap_error_handling_update
586 @monkeypatch_loggers
587 @operation
588 def update_image(image, **kwargs):
589     """ Restart component with a new Docker image """
590
591     service_component_name = ctx.instance.runtime_properties["service_component_name"]
592     if image:
593         current_image = ctx.instance.runtime_properties["image"]
594         ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
595         deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
596         k8sclient.upgrade(deployment_description, image)
597         ctx.instance.runtime_properties["image"] = image
598
599         # Verify that the update took place as expected
600         max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
601         ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
602         if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
603             ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
604
605     else:
606         ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
607
608 #TODO: implement rollback operation when kubernetes python client fix is available.
609 # (See comments in k8sclient.py.)
610 # In the meantime, it's possible to undo an update_image operation by doing a second
611 # update_image that specifies the older image.
612
613 @monkeypatch_loggers
614 @Policies.cleanup_policies_on_node
615 @operation
616 def cleanup_discovery(**kwargs):
617     """Delete configuration from Consul"""
618     if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties:
619         service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
620
621         try:
622             conn = dis.create_kv_conn(CONSUL_HOST)
623             dis.remove_service_component_config(conn, service_component_name)
624         except dis.DiscoveryConnectionError as e:
625             raise RecoverableError(e)
626     else:
627         # When another node in the blueprint fails install,
628         # this node may not have generated a service component name.
629         # There's nothing to delete from Consul.
630         ctx.logger.info ("No service_component_name, not attempting to delete config from Consul")
631
632 def _notify_container(**kwargs):
633     """
634     Notify container using the policy section in the docker_config.
635     Notification consists of running a script in the application container
636     in each pod in the Kubernetes deployment associated with this node.
637     Return the list of notification results.
638     """
639     dc = kwargs["docker_config"]
640     resp = []
641
642     if "policy" in dc and dc["policy"].get("trigger_type") == "docker":
643         # Build the command to execute in the container
644         # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
645         script_path = dc["policy"]["script_path"]
646         policy_data = {
647             "policies": kwargs["policies"],
648             "updated_policies": kwargs["updated_policies"],
649             "removed_policies": kwargs["removed_policies"]
650         }
651
652         command = [script_path, "policies", json.dumps(policy_data)]
653
654         # Execute the command
655         deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
656         resp = k8sclient.execute_command_in_deployment(deployment_description, command)
657
658     # else the default is no trigger
659
660     return resp
661
662 @operation
663 @monkeypatch_loggers
664 @Policies.update_policies_on_node()
665 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
666     """Policy update task
667
668     This method is responsible for updating the application configuration and
669     notifying the applications that the change has occurred. This is to be used
670     for the dcae.interfaces.policy.policy_update operation.
671
672     :updated_policies: contains the list of changed policy-configs when configs_only=True
673         (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
674     """
675     service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
676     ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
677         .format(service_component_name, updated_policies, removed_policies, policies))
678     update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
679     update_inputs["updated_policies"] = updated_policies
680     update_inputs["removed_policies"] = removed_policies
681     update_inputs["policies"] = policies
682
683     resp = _notify_container(**update_inputs)
684     ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))