21f70c12e47d2d5c89d38bbd9e410f5f37d1c3b9
[dcaegen2/platform/plugins.git] / k8s / k8splugin / tasks.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19
20 # Lifecycle interface calls for containerized components
21
22 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
23 from . import cloudify_importer
24
25 import time, copy
26 import json
27 from cloudify import ctx
28 from cloudify.decorators import operation
29 from cloudify.exceptions import NonRecoverableError, RecoverableError
30 from onap_dcae_dcaepolicy_lib import Policies
31 from k8splugin import discovery as dis
32 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
33     merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
34 from k8splugin.exceptions import DockerPluginDeploymentError
35 from k8splugin import utils
36 from configure import configure
37 import k8sclient
38
39 # Get configuration
40 plugin_conf = configure.configure()
41 CONSUL_HOST = plugin_conf.get("consul_host")
42 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
43 DCAE_NAMESPACE = plugin_conf.get("namespace")
44 DEFAULT_MAX_WAIT = plugin_conf.get("max_wait")
45 DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location")
46 COMPONENT_CERT_DIR = plugin_conf.get("tls",{}).get("component_cert_dir")
47 CBS_BASE_URL = plugin_conf.get("cbs").get("base_url")
48
49 # Used to construct delivery urls for data router subscribers. Data router in FTL
50 # requires https but this author believes that ONAP is to be defaulted to http.
51 DEFAULT_SCHEME = "http"
52
53 # Property keys
54 SERVICE_COMPONENT_NAME = "service_component_name"
55 CONTAINER_ID = "container_id"
56 APPLICATION_CONFIG = "application_config"
57 K8S_DEPLOYMENT = "k8s_deployment"
58 RESOURCE_KW = "resource_config"
59 LOCATION_ID = "location_id"
60
61 # Utility methods
62
63 # Lifecycle interface calls for dcae.nodes.DockerContainer
64
65 def _setup_for_discovery(**kwargs):
66     """Setup for config discovery"""
67     try:
68         name = kwargs['name']
69         application_config = kwargs[APPLICATION_CONFIG]
70
71         # NOTE: application_config is no longer a json string and is inputed as a
72         # YAML map which translates to a dict. We don't have to do any
73         # preprocessing anymore.
74         conn = dis.create_kv_conn(CONSUL_HOST)
75         dis.push_service_component_config(conn, name, application_config)
76         return kwargs
77     except dis.DiscoveryConnectionError as e:
78         raise RecoverableError(e)
79     except Exception as e:
80         ctx.logger.error("Unexpected error while pushing configuration: {0}"
81                 .format(str(e)))
82         raise NonRecoverableError(e)
83
84 def _generate_component_name(**kwargs):
85     """Generate component name"""
86     service_component_type = kwargs['service_component_type']
87     name_override = kwargs['service_component_name_override']
88
89     kwargs['name'] = name_override if name_override \
90             else dis.generate_service_component_name(service_component_type)
91     return kwargs
92
93 def _done_for_create(**kwargs):
94     """Wrap up create operation"""
95     name = kwargs['name']
96     kwargs[SERVICE_COMPONENT_NAME] = name
97     # All updates to the runtime_properties happens here. I don't see a reason
98     # why we shouldn't do this because the context is not being mutated by
99     # something else and will keep the other functions pure (pure in the sense
100     # not dealing with CloudifyContext).
101     ctx.instance.runtime_properties.update(kwargs)
102     ctx.logger.info("Done setting up: {0}".format(name))
103     return kwargs
104
105 def _get_resources(**kwargs):
106     if kwargs is not None:
107         ctx.logger.debug("{0}: {1}".format(RESOURCE_KW, kwargs.get(RESOURCE_KW)))
108         return kwargs.get(RESOURCE_KW)
109     ctx.logger.info("set resources to None")
110     return None
111
112 def  _get_location():
113     ''' Get the k8s location property.  Set to the default if the property is missing, None, or zero-length '''
114     return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \
115         else DEFAULT_K8S_LOCATION
116
117 @merge_inputs_for_create
118 @monkeypatch_loggers
119 @Policies.gather_policies_to_node()
120 @operation
121 def create_for_components(**create_inputs):
122     """Create step for service components
123
124     This interface is responsible for:
125
126     1. Generating service component name
127     2. Populating config information into Consul
128     """
129     _done_for_create(
130             **_setup_for_discovery(
131                 **_enhance_docker_params(
132                     **_generate_component_name(
133                         **create_inputs))))
134
135
136 def _parse_streams(**kwargs):
137     """Parse streams and setup for DMaaP plugin"""
138     # The DMaaP plugin requires this plugin to set the runtime properties
139     # keyed by the node name.
140     for stream in kwargs["streams_publishes"]:
141         kwargs[stream["name"]] = stream
142
143     for stream in kwargs["streams_subscribes"]:
144         if stream["type"] == "data_router":
145
146             # Don't want to mutate the source
147             stream = copy.deepcopy(stream)
148
149             # Set up the delivery URL
150             # Using service_component_name as the host name in the subscriber URL
151             # will work in a single-cluster ONAP deployment.  Whether it will also work
152             # in a multi-cluster ONAP deployment--with a central location and one or
153             # more remote ("edge") locations depends on how networking and DNS is set
154             # up in a multi-cluster deployment
155             service_component_name = kwargs["name"]
156             ports, _ = k8sclient.parse_ports(kwargs["ports"])
157             dport, _ = ports[0]
158             subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
159
160             scheme = stream.get("scheme", DEFAULT_SCHEME)
161             if "route" not in stream:
162                 raise NonRecoverableError("'route' key missing from data router subscriber")
163             path = stream["route"]
164             stream["delivery_url"] = "{scheme}://{host}/{path}".format(
165                     scheme=scheme, host=subscriber_host, path=path)
166
167             # If username and password has not been provided then generate it. The
168             # DMaaP plugin doesn't generate for subscribers. The generation code
169             # and length of username password has been lifted from the DMaaP
170             # plugin.
171             if not stream.get("username", None):
172                 stream["username"] = utils.random_string(8)
173             if not stream.get("password", None):
174                 stream["password"] = utils.random_string(10)
175
176         kwargs[stream["name"]] = stream
177
178     return kwargs
179
180 @merge_inputs_for_create
181 @monkeypatch_loggers
182 @Policies.gather_policies_to_node()
183 @operation
184 def create_for_components_with_streams(**create_inputs):
185     """Create step for service components that use DMaaP
186
187     This interface is responsible for:
188
189     1. Generating service component name
190     2. Setup runtime properties for DMaaP plugin
191     3. Populating application config into Consul
192     """
193     _done_for_create(
194             **_setup_for_discovery(
195                 **_parse_streams(
196                     **_enhance_docker_params(
197                         **_generate_component_name(
198                             **create_inputs)))))
199
200 @merge_inputs_for_create
201 @monkeypatch_loggers
202 @operation
203 def create_for_platforms(**create_inputs):
204     """Create step for platform components
205
206     This interface is responible for:
207
208     1. Populating config information into Consul
209     """
210     _done_for_create(
211             **_setup_for_discovery(
212                 **create_inputs))
213
214 def _verify_k8s_deployment(location, service_component_name, max_wait):
215     """Verify that the k8s Deployment is ready
216
217     Args:
218     -----
219     location (string): location of the k8s cluster where the component was deployed
220     service_component_name: component's service component name
221     max_wait (integer): limit to how may attempts to make which translates to
222         seconds because each sleep is one second. 0 means infinite.
223
224     Return:
225     -------
226     True if deployment is ready within the maximum wait time, False otherwise
227     """
228     num_attempts = 1
229
230     while True:
231         if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name):
232             return True
233         else:
234             num_attempts += 1
235
236             if max_wait > 0 and max_wait < num_attempts:
237                 return False
238
239             time.sleep(1)
240
241     return True
242
243 def _create_and_start_container(container_name, image, **kwargs):
244     '''
245     This will create a k8s Deployment and, if needed, a k8s Service or two.
246     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
247     We're not exposing k8s to the component developer and the blueprint author.
248     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
249     the details from the component developer and the blueprint author.)
250
251     kwargs may have:
252         - volumes:  array of volume objects, where a volume object is:
253             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
254         - ports: array of strings in the form "container_port:host_port"
255         - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
256         - always_pull: boolean.  If true, sets image pull policy to "Always"
257           so that a fresh copy of the image is always pull.  Otherwise, sets
258           image pull policy to "IfNotPresent"
259         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
260         - log_info: an object with info for setting up ELK logging, with the form:
261             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
262         - tls_info: an object with information for setting up the component to act as a TLS server, with the form:
263             {"use_tls" : true_or_false, "cert_directory": "/path/to/directory_where_certs_should_be_placed" }
264         - replicas: number of replicas to be launched initially
265         - readiness: object with information needed to create a readiness check
266         - liveness: object with information needed to create a liveness check
267         - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
268     '''
269     tls_info = kwargs.get("tls_info") or {}
270     cert_dir = tls_info.get("cert_directory") or COMPONENT_CERT_DIR
271     env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
272             "CONFIG_BINDING_SERVICE": "config-binding-service",
273             "DCAE_CA_CERTPATH" : "{0}/cacert.pem".format(cert_dir),
274             "CBS_CONFIG_URL" : "{0}/{1}".format(CBS_BASE_URL, container_name)
275           }
276     env.update(kwargs.get("envs", {}))
277     ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
278     ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
279     replicas = kwargs.get("replicas", 1)
280     resource_config = _get_resources(**kwargs)
281     _, dep = k8sclient.deploy(DCAE_NAMESPACE,
282                      container_name,
283                      image,
284                      replicas=replicas,
285                      always_pull=kwargs.get("always_pull_image", False),
286                      k8sconfig=plugin_conf,
287                      resources=resource_config,
288                      volumes=kwargs.get("volumes", []),
289                      ports=kwargs.get("ports", []),
290                      msb_list=kwargs.get("msb_list"),
291                      tls_info=kwargs.get("tls_info"),
292                      env=env,
293                      labels=kwargs.get("labels", {}),
294                      log_info=kwargs.get("log_info"),
295                      readiness=kwargs.get("readiness"),
296                      liveness=kwargs.get("liveness"),
297                      k8s_location=kwargs.get("k8s_location"))
298
299     # Capture the result of deployment for future use
300     ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
301     kwargs[K8S_DEPLOYMENT] = dep
302     ctx.instance.runtime_properties["replicas"] = replicas
303     ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
304     return kwargs
305
306 def _parse_cloudify_context(**kwargs):
307     """Parse Cloudify context
308
309     Extract what is needed. This is impure function because it requires ctx.
310     """
311     kwargs["deployment_id"] = ctx.deployment.id
312
313     # Set some labels for the Kubernetes pods
314     # The name segment is required and must be 63 characters or less
315     kwargs["labels"] = {
316         "cfydeployment" : ctx.deployment.id,
317         "cfynode": ctx.node.name[:63],
318         "cfynodeinstance": ctx.instance.id[:63]
319     }
320
321     # Pick up the centralized logging info
322     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
323         kwargs["log_info"] = ctx.node.properties["log_info"]
324
325     # Pick up TLS info if present
326     if "tls_info" in ctx.node.properties:
327         kwargs["tls_info"] = ctx.node.properties["tls_info"]
328
329     # Pick up replica count and always_pull_image flag
330     if "replicas" in ctx.node.properties:
331         kwargs["replicas"] = ctx.node.properties["replicas"]
332     if "always_pull_image" in ctx.node.properties:
333         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
334
335     # Pick up location
336     kwargs["k8s_location"] = _get_location()
337
338     return kwargs
339
340 def _enhance_docker_params(**kwargs):
341     '''
342     Set up Docker environment variables and readiness/liveness check info
343     and inject into kwargs.
344     '''
345
346     # Get info for setting up readiness/liveness probe, if present
347     docker_config = kwargs.get("docker_config", {})
348     if "healthcheck" in docker_config:
349         kwargs["readiness"] = docker_config["healthcheck"]
350     if "livehealthcheck" in docker_config:
351         kwargs["liveness"] = docker_config["livehealthcheck"]
352
353     envs = kwargs.get("envs", {})
354
355     # Set tags on this component for its Consul registration as a service
356     tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
357     tags = [ str(tag) for tag in tags if tag is not None ]
358     # Registrator will use this to register this component with tags. Must be
359     # comma delimited.
360     envs["SERVICE_TAGS"] = ",".join(tags)
361
362     kwargs["envs"] = envs
363
364     def combine_params(key, docker_config, kwargs):
365         v = docker_config.get(key, []) + kwargs.get(key, [])
366         kwargs[key] = v
367         return kwargs
368
369     # Add the lists of ports and volumes unintelligently - meaning just add the
370     # lists together with no deduping.
371     kwargs = combine_params("ports", docker_config, kwargs)
372     kwargs = combine_params("volumes", docker_config, kwargs)
373
374
375     return kwargs
376
377 def _create_and_start_component(**kwargs):
378     """Create and start component (container)"""
379     image = kwargs["image"]
380     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
381     # Need to be picky and manually select out pieces because just using kwargs
382     # which contains everything confused the execution of
383     # _create_and_start_container because duplicate variables exist
384     sub_kwargs = {
385         "volumes": kwargs.get("volumes", []),
386         "ports": kwargs.get("ports", None),
387         "envs": kwargs.get("envs", {}),
388         "log_info": kwargs.get("log_info", {}),
389         "tls_info": kwargs.get("tls_info", {}),
390         "labels": kwargs.get("labels", {}),
391         "resource_config": kwargs.get("resource_config",{}),
392         "readiness": kwargs.get("readiness",{}),
393         "liveness": kwargs.get("liveness",{}),
394         "k8s_location": kwargs.get("k8s_location")}
395     returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs)
396     kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT]
397
398     return kwargs
399
400 def _verify_component(**kwargs):
401     """Verify deployment is ready"""
402     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
403
404     max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
405     ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
406
407     if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait):
408         ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
409     else:
410         # The component did not become ready within the "max_wait" interval.
411         # Delete the k8s components created already and remove configuration from Consul.
412         ctx.logger.error("k8s deployment never became ready for {0}".format(service_component_name))
413         if (K8S_DEPLOYMENT in kwargs) and (len(kwargs[K8S_DEPLOYMENT]["deployment"]) > 0):
414             ctx.logger.info("attempting to delete k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
415             k8sclient.undeploy(kwargs[K8S_DEPLOYMENT])
416             ctx.logger.info("deleted k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
417         cleanup_discovery(**kwargs)
418         raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
419
420     return kwargs
421
422 def _done_for_start(**kwargs):
423     ctx.instance.runtime_properties.update(kwargs)
424     ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
425     return kwargs
426
427 def _setup_msb_registration(service_name, msb_reg):
428     return {
429         "serviceName" : service_name,
430         "port" : msb_reg.get("port", "80"),
431         "version" : msb_reg.get("version", "v1"),
432         "url" : msb_reg.get("url_path", "/v1"),
433         "protocol" : "REST",
434         "enable_ssl" : msb_reg.get("uses_ssl", False),
435         "visualRange" : "1"
436 }
437
438 @wrap_error_handling_start
439 @merge_inputs_for_start
440 @monkeypatch_loggers
441 @operation
442 def create_and_start_container_for_components(**start_inputs):
443     """Initiate Kubernetes deployment for service components
444
445     This operation method is to be used with the ContainerizedServiceComponent
446     node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
447     that the app is up and responding successfully to readiness probes.
448     """
449     _done_for_start(
450             **_verify_component(
451                 **_create_and_start_component(
452                     **_parse_cloudify_context(**start_inputs))))
453
454 @wrap_error_handling_start
455 @monkeypatch_loggers
456 @operation
457 def create_and_start_container_for_platforms(**kwargs):
458     """Initiate Kubernetes deployment for platform components
459
460     This operation method is to be used with the ContainerizedPlatformComponent
461     node type.
462     """
463     # Capture node properties
464     image = ctx.node.properties["image"]
465     docker_config = ctx.node.properties.get("docker_config", {})
466     resource_config = ctx.node.properties.get("resource_config", {})
467     kwargs["resource_config"] = resource_config
468     if "healthcheck" in docker_config:
469         kwargs["readiness"] = docker_config["healthcheck"]
470     if "livehealthcheck" in docker_config:
471         kwargs["liveness"] = docker_config["livehealthcheck"]
472     if "dns_name" in ctx.node.properties:
473         service_component_name = ctx.node.properties["dns_name"]
474     else:
475         service_component_name = ctx.node.properties["name"]
476
477     # Set some labels for the Kubernetes pods
478     # The name segment is required and must be 63 characters or less
479     kwargs["labels"] = {
480         "cfydeployment" : ctx.deployment.id,
481         "cfynode": ctx.node.name[:63],
482         "cfynodeinstance": ctx.instance.id[:63]
483     }
484
485     host_port = ctx.node.properties["host_port"]
486     container_port = ctx.node.properties["container_port"]
487
488     # Cloudify properties are all required and Cloudify complains that None
489     # is not a valid type for integer. Defaulting to 0 to indicate to not
490     # use this and not to set a specific port mapping in cases like service
491     # change handler.
492     if container_port != 0:
493         # Doing this because other nodes might want to use this property
494         port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
495         ports = kwargs.get("ports", []) + [ port_mapping ]
496         kwargs["ports"] = ports
497     if "ports" not in kwargs:
498         ctx.logger.warn("No port mappings defined. Will randomly assign port.")
499
500     # All of the new node properties could be handled more DRYly!
501     # If a registration to MSB is required, then set up the registration info
502     if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
503         kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
504
505     # If centralized logging via ELK is desired, then set up the logging info
506     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
507         kwargs["log_info"] = ctx.node.properties["log_info"]
508
509     # Pick up TLS info if present
510     if "tls_info" in ctx.node.properties:
511         kwargs["tls_info"] = ctx.node.properties["tls_info"]
512
513     # Pick up replica count and always_pull_image flag
514     if "replicas" in ctx.node.properties:
515         kwargs["replicas"] = ctx.node.properties["replicas"]
516     if "always_pull_image" in ctx.node.properties:
517         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
518
519     # Pick up location
520     kwargs["k8s_location"] = _get_location()
521
522     returned_args = _create_and_start_container(service_component_name, image, **kwargs)
523
524     # Verify that the k8s deployment is ready
525     #   - Set service component name into kwargs
526     #   - max_wait is already in kwargs if it was set
527     returned_args[SERVICE_COMPONENT_NAME] = service_component_name
528     _verify_component(**returned_args)
529
530 @wrap_error_handling_start
531 @monkeypatch_loggers
532 @operation
533 def create_and_start_container(**kwargs):
534     """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
535     service_component_name = ctx.node.properties["name"]
536     ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
537
538     image = ctx.node.properties["image"]
539     kwargs["k8s_location"] = _get_location()
540
541     _create_and_start_container(service_component_name, image,**kwargs)
542
543 @monkeypatch_loggers
544 @operation
545 def stop_and_remove_container(**kwargs):
546     """Delete Kubernetes deployment"""
547     if K8S_DEPLOYMENT in ctx.instance.runtime_properties:
548         try:
549             deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
550             k8sclient.undeploy(deployment_description)
551
552         except Exception as e:
553             ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
554                     .format(str(e)))
555     else:
556         # A previous install workflow may have failed,
557         # and no Kubernetes deployment info was recorded in runtime_properties.
558         # No need to run the undeploy operation
559         ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment")
560
561 @wrap_error_handling_update
562 @monkeypatch_loggers
563 @operation
564 def scale(replicas, **kwargs):
565     """Change number of replicas in the deployment"""
566     service_component_name = ctx.instance.runtime_properties["service_component_name"]
567
568     if replicas > 0:
569         current_replicas = ctx.instance.runtime_properties["replicas"]
570         ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
571         deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
572         k8sclient.scale(deployment_description, replicas)
573         ctx.instance.runtime_properties["replicas"] = replicas
574
575         # Verify that the scaling took place as expected
576         max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
577         ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
578         if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
579             ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
580
581     else:
582         ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
583
584 @wrap_error_handling_update
585 @monkeypatch_loggers
586 @operation
587 def update_image(image, **kwargs):
588     """ Restart component with a new Docker image """
589
590     service_component_name = ctx.instance.runtime_properties["service_component_name"]
591     if image:
592         current_image = ctx.instance.runtime_properties["image"]
593         ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
594         deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
595         k8sclient.upgrade(deployment_description, image)
596         ctx.instance.runtime_properties["image"] = image
597
598         # Verify that the update took place as expected
599         max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
600         ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
601         if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
602             ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
603
604     else:
605         ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
606
607 #TODO: implement rollback operation when kubernetes python client fix is available.
608 # (See comments in k8sclient.py.)
609 # In the meantime, it's possible to undo an update_image operation by doing a second
610 # update_image that specifies the older image.
611
612 @monkeypatch_loggers
613 @Policies.cleanup_policies_on_node
614 @operation
615 def cleanup_discovery(**kwargs):
616     """Delete configuration from Consul"""
617     if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties:
618         service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
619
620         try:
621             conn = dis.create_kv_conn(CONSUL_HOST)
622             dis.remove_service_component_config(conn, service_component_name)
623         except dis.DiscoveryConnectionError as e:
624             raise RecoverableError(e)
625     else:
626         # When another node in the blueprint fails install,
627         # this node may not have generated a service component name.
628         # There's nothing to delete from Consul.
629         ctx.logger.info ("No service_component_name, not attempting to delete config from Consul")
630
631 def _notify_container(**kwargs):
632     """
633     Notify container using the policy section in the docker_config.
634     Notification consists of running a script in the application container
635     in each pod in the Kubernetes deployment associated with this node.
636     Return the list of notification results.
637     """
638     dc = kwargs["docker_config"]
639     resp = []
640
641     if "policy" in dc and dc["policy"].get("trigger_type") == "docker":
642         # Build the command to execute in the container
643         # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
644         script_path = dc["policy"]["script_path"]
645         policy_data = {
646             "policies": kwargs["policies"],
647             "updated_policies": kwargs["updated_policies"],
648             "removed_policies": kwargs["removed_policies"]
649         }
650
651         command = [script_path, "policies", json.dumps(policy_data)]
652
653         # Execute the command
654         deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
655         resp = k8sclient.execute_command_in_deployment(deployment_description, command)
656
657     # else the default is no trigger
658
659     return resp
660
661 @operation
662 @monkeypatch_loggers
663 @Policies.update_policies_on_node()
664 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
665     """Policy update task
666
667     This method is responsible for updating the application configuration and
668     notifying the applications that the change has occurred. This is to be used
669     for the dcae.interfaces.policy.policy_update operation.
670
671     :updated_policies: contains the list of changed policy-configs when configs_only=True
672         (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
673     """
674     service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
675     ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
676         .format(service_component_name, updated_policies, removed_policies, policies))
677     update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
678     update_inputs["updated_policies"] = updated_policies
679     update_inputs["removed_policies"] = removed_policies
680     update_inputs["policies"] = policies
681
682     resp = _notify_container(**update_inputs)
683     ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))