Add resource_config to specify CPU and menory
[dcaegen2/platform/plugins.git] / k8s / k8splugin / tasks.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20
21 # Lifecycle interface calls for containerized components
22
23 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
24 import cloudify_importer
25
26 import time, copy
27 import json
28 from cloudify import ctx
29 from cloudify.decorators import operation
30 from cloudify.exceptions import NonRecoverableError, RecoverableError
31 from onap_dcae_dcaepolicy_lib import Policies
32 from k8splugin import discovery as dis
33 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
34     merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
35 from k8splugin.exceptions import DockerPluginDeploymentError
36 from k8splugin import utils
37 from configure import configure
38 import k8sclient
39
40 # Get configuration
41 plugin_conf = configure.configure()
42 CONSUL_HOST = plugin_conf.get("consul_host")
43 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
44 DCAE_NAMESPACE = plugin_conf.get("namespace")
45 DEFAULT_MAX_WAIT = plugin_conf.get("max_wait", 1800)
46
47 # Used to construct delivery urls for data router subscribers. Data router in FTL
48 # requires https but this author believes that ONAP is to be defaulted to http.
49 DEFAULT_SCHEME = "http"
50
51 # Property keys
52 SERVICE_COMPONENT_NAME = "service_component_name"
53 CONTAINER_ID = "container_id"
54 APPLICATION_CONFIG = "application_config"
55 K8S_DEPLOYMENT = "k8s_deployment"
56 RESOURCE_KW = "resource_config"
57
58 # Utility methods
59
60 # Lifecycle interface calls for dcae.nodes.DockerContainer
61
62 def _setup_for_discovery(**kwargs):
63     """Setup for config discovery"""
64     try:
65         name = kwargs['name']
66         application_config = kwargs[APPLICATION_CONFIG]
67
68         # NOTE: application_config is no longer a json string and is inputed as a
69         # YAML map which translates to a dict. We don't have to do any
70         # preprocessing anymore.
71         conn = dis.create_kv_conn(CONSUL_HOST)
72         dis.push_service_component_config(conn, name, application_config)
73         return kwargs
74     except dis.DiscoveryConnectionError as e:
75         raise RecoverableError(e)
76     except Exception as e:
77         ctx.logger.error("Unexpected error while pushing configuration: {0}"
78                 .format(str(e)))
79         raise NonRecoverableError(e)
80
81 def _generate_component_name(**kwargs):
82     """Generate component name"""
83     service_component_type = kwargs['service_component_type']
84     name_override = kwargs['service_component_name_override']
85
86     kwargs['name'] = name_override if name_override \
87             else dis.generate_service_component_name(service_component_type)
88     return kwargs
89
90 def _done_for_create(**kwargs):
91     """Wrap up create operation"""
92     name = kwargs['name']
93     kwargs[SERVICE_COMPONENT_NAME] = name
94     # All updates to the runtime_properties happens here. I don't see a reason
95     # why we shouldn't do this because the context is not being mutated by
96     # something else and will keep the other functions pure (pure in the sense
97     # not dealing with CloudifyContext).
98     ctx.instance.runtime_properties.update(kwargs)
99     ctx.logger.info("Done setting up: {0}".format(name))
100     return kwargs
101
102 def _get_resources(**kwargs):
103     if kwargs is not None:
104         ctx.logger.debug("{0}: {1}".format(RESOURCE_KW, kwargs.get(RESOURCE_KW)))
105         return kwargs.get(RESOURCE_KW)
106     ctx.logger.info("set resources to None")
107     return None
108
109 @merge_inputs_for_create
110 @monkeypatch_loggers
111 @Policies.gather_policies_to_node()
112 @operation
113 def create_for_components(**create_inputs):
114     """Create step for service components
115
116     This interface is responsible for:
117
118     1. Generating service component name
119     2. Populating config information into Consul
120     """
121     _done_for_create(
122             **_setup_for_discovery(
123                 **_generate_component_name(
124                     **create_inputs)))
125
126
127 def _parse_streams(**kwargs):
128     """Parse streams and setup for DMaaP plugin"""
129     # The DMaaP plugin requires this plugin to set the runtime properties
130     # keyed by the node name.
131     def setup_publishes(s):
132         kwargs[s["name"]] = s
133
134     map(setup_publishes, kwargs["streams_publishes"])
135
136     def setup_subscribes(s):
137         if s["type"] == "data_router":
138             # If username and password has been provided then generate it. The
139             # DMaaP plugin doesn't generate for subscribers. The generation code
140             # and length of username password has been lifted from the DMaaP
141             # plugin.
142
143             # Don't want to mutate the source
144             s = copy.deepcopy(s)
145             if not s.get("username", None):
146                 s["username"] = utils.random_string(8)
147             if not s.get("password", None):
148                 s["password"] = utils.random_string(10)
149
150         kwargs[s["name"]] = s
151
152     # NOTE: That the delivery url is constructed and setup in the start operation
153     map(setup_subscribes, kwargs["streams_subscribes"])
154
155     return kwargs
156
157 def _setup_for_discovery_streams(**kwargs):
158     """Setup for discovery of streams
159
160     Specifically, there's a race condition this call addresses for data router
161     subscriber case. The component needs its feed subscriber information but the
162     DMaaP plugin doesn't provide this until after the docker plugin start
163     operation.
164     """
165     dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
166             if s["type"] == "data_router"]
167
168     if dr_subs:
169         dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
170         conn = dis.create_kv_conn(CONSUL_HOST)
171
172         def add_feed(dr_sub):
173             # delivery url and subscriber id will be fill by the dmaap plugin later
174             v = { "location": dr_sub["location"], "delivery_url": None,
175                     "username": dr_sub["username"], "password": dr_sub["password"],
176                     "subscriber_id": None }
177             return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
178
179         try:
180             if all(map(add_feed, dr_subs)):
181                 return kwargs
182         except Exception as e:
183             raise NonRecoverableError(e)
184
185         # You should never get here
186         raise NonRecoverableError("Failure updating feed streams in Consul")
187     else:
188         return kwargs
189
190
191 @merge_inputs_for_create
192 @monkeypatch_loggers
193 @Policies.gather_policies_to_node()
194 @operation
195 def create_for_components_with_streams(**create_inputs):
196     """Create step for service components that use DMaaP
197
198     This interface is responsible for:
199
200     1. Generating service component name
201     2. Setup runtime properties for DMaaP plugin
202     3. Populating application config into Consul
203     4. Populating DMaaP config for data router subscribers in Consul
204     """
205     _done_for_create(
206             **_setup_for_discovery(
207                 **_setup_for_discovery_streams(
208                     **_parse_streams(
209                         **_generate_component_name(
210                             **create_inputs)))))
211
212
213 @merge_inputs_for_create
214 @monkeypatch_loggers
215 @operation
216 def create_for_platforms(**create_inputs):
217     """Create step for platform components
218
219     This interface is responible for:
220
221     1. Populating config information into Consul
222     """
223     _done_for_create(
224             **_setup_for_discovery(
225                 **create_inputs))
226
227
228 def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
229         with_port=False):
230     conn = dis.create_kv_conn(consul_host)
231     results = dis.lookup_service(conn, service_component_name)
232
233     if with_port:
234         # Just grab first
235         result = results[0]
236         return "{address}:{port}".format(address=result["ServiceAddress"],
237                 port=result["ServicePort"])
238     else:
239         return results[0]["ServiceAddress"]
240
241
242 def _verify_k8s_deployment(service_component_name, max_wait):
243     """Verify that the k8s Deployment is ready
244
245     Args:
246     -----
247     max_wait (integer): limit to how may attempts to make which translates to
248         seconds because each sleep is one second. 0 means infinite.
249
250     Return:
251     -------
252     True if deployment is ready else a DockerPluginDeploymentError exception
253     will be raised.
254     """
255     num_attempts = 1
256
257     while True:
258         if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
259             return True
260         else:
261             num_attempts += 1
262
263             if max_wait > 0 and max_wait < num_attempts:
264                 raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
265
266             time.sleep(1)
267
268     return True
269
270 def _create_and_start_container(container_name, image, **kwargs):
271     '''
272     This will create a k8s Deployment and, if needed, a k8s Service or two.
273     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
274     We're not exposing k8s to the component developer and the blueprint author.
275     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
276     the details from the component developer and the blueprint author.)
277
278     kwargs may have:
279         - volumes:  array of volume objects, where a volume object is:
280             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
281         - ports: array of strings in the form "container_port:host_port"
282         - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
283         - always_pull: boolean.  If true, sets image pull policy to "Always"
284           so that a fresh copy of the image is always pull.  Otherwise, sets
285           image pull policy to "IfNotPresent"
286         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
287         - log_info: an object with info for setting up ELK logging, with the form:
288             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
289         - replicas: number of replicas to be launched initially
290         - readiness: object with information needed to create a readiness check
291     '''
292     env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
293             "CONFIG_BINDING_SERVICE": "config-binding-service" }
294     env.update(kwargs.get("envs", {}))
295     ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
296     ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
297     replicas = kwargs.get("replicas", 1)
298     resource_config = _get_resources(**kwargs)
299     _,dep = k8sclient.deploy(DCAE_NAMESPACE,
300                      container_name,
301                      image,
302                      replicas = replicas,
303                      always_pull=kwargs.get("always_pull_image", False),
304                      k8sconfig=plugin_conf,
305                      resources = resource_config,
306                      volumes=kwargs.get("volumes",[]),
307                      ports=kwargs.get("ports",[]),
308                      msb_list=kwargs.get("msb_list"),
309                      tls_info=kwargs.get("tls_info"),
310                      env = env,
311                      labels = kwargs.get("labels", {}),
312                      log_info=kwargs.get("log_info"),
313                      readiness=kwargs.get("readiness"))
314
315     # Capture the result of deployment for future use
316     ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
317     ctx.instance.runtime_properties["replicas"] = replicas
318     ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
319
320 def _parse_cloudify_context(**kwargs):
321     """Parse Cloudify context
322
323     Extract what is needed. This is impure function because it requires ctx.
324     """
325     kwargs["deployment_id"] = ctx.deployment.id
326
327     # Set some labels for the Kubernetes pods
328     kwargs["labels"] = {
329         "cfydeployment" : ctx.deployment.id,
330         "cfynode": ctx.node.name,
331         "cfynodeinstance": ctx.instance.id
332     }
333
334         # Pick up the centralized logging info
335     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
336         kwargs["log_info"] = ctx.node.properties["log_info"]
337
338     # Pick up TLS info if present
339     if "tls_info" in ctx.node.properties:
340         kwargs["tls_info"] = ctx.node.properties["tls_info"]
341
342     # Pick up replica count and always_pull_image flag
343     if "replicas" in ctx.node.properties:
344         kwargs["replicas"] = ctx.node.properties["replicas"]
345     if "always_pull_image" in ctx.node.properties:
346         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
347
348     return kwargs
349
350 def _enhance_docker_params(**kwargs):
351     '''
352     Set up Docker environment variables and readiness check info
353     and inject into kwargs.
354     '''
355
356     # Get info for setting up readiness probe, if present
357     docker_config = kwargs.get("docker_config", {})
358     if "healthcheck" in docker_config:
359         kwargs["readiness"] = docker_config["healthcheck"]
360
361     envs = kwargs.get("envs", {})
362
363     # Set tags on this component for its Consul registration as a service
364     tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
365     tags = [ str(tag) for tag in tags if tag is not None ]
366     # Registrator will use this to register this component with tags. Must be
367     # comma delimited.
368     envs["SERVICE_TAGS"] = ",".join(tags)
369
370     kwargs["envs"] = envs
371
372     def combine_params(key, docker_config, kwargs):
373         v = docker_config.get(key, []) + kwargs.get(key, [])
374         if v:
375             kwargs[key] = v
376         return kwargs
377
378     # Add the lists of ports and volumes unintelligently - meaning just add the
379     # lists together with no deduping.
380     kwargs = combine_params("ports", docker_config, kwargs)
381     kwargs = combine_params("volumes", docker_config, kwargs)
382
383
384     return kwargs
385
386 def _create_and_start_component(**kwargs):
387     """Create and start component (container)"""
388     image = kwargs["image"]
389     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
390     # Need to be picky and manually select out pieces because just using kwargs
391     # which contains everything confused the execution of
392     # _create_and_start_container because duplicate variables exist
393     sub_kwargs = {
394         "volumes": kwargs.get("volumes", []),
395         "ports": kwargs.get("ports", None),
396         "envs": kwargs.get("envs", {}),
397         "log_info": kwargs.get("log_info", {}),
398         "tls_info": kwargs.get("tls_info", {}),
399         "labels": kwargs.get("labels", {}),
400         "resource_config": kwargs.get("resource_config",{}),
401         "readiness": kwargs.get("readiness",{})}
402     _create_and_start_container(service_component_name, image, **sub_kwargs)
403
404     return kwargs
405
406 def _verify_component(**kwargs):
407     """Verify deployment is ready"""
408     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
409
410     max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
411     ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
412
413     if _verify_k8s_deployment(service_component_name, max_wait):
414         ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
415
416     return kwargs
417
418 def _done_for_start(**kwargs):
419     ctx.instance.runtime_properties.update(kwargs)
420     ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
421     return kwargs
422
423 def _setup_msb_registration(service_name, msb_reg):
424     return {
425         "serviceName" : service_name,
426         "port" : msb_reg.get("port", "80"),
427         "version" : msb_reg.get("version", "v1"),
428         "url" : msb_reg.get("url_path", "/v1"),
429         "protocol" : "REST",
430         "enable_ssl" : msb_reg.get("uses_ssl", False),
431         "visualRange" : "1"
432 }
433
434 @wrap_error_handling_start
435 @merge_inputs_for_start
436 @monkeypatch_loggers
437 @operation
438 def create_and_start_container_for_components(**start_inputs):
439     """Initiate Kubernetes deployment for service components
440
441     This operation method is to be used with the ContainerizedServiceComponent
442     node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
443     that the app is up and responding successfully to readiness probes.
444     """
445     _done_for_start(
446             **_verify_component(
447                 **_create_and_start_component(
448                     **_enhance_docker_params(
449                         **_parse_cloudify_context(**start_inputs)))))
450
451
452 def _update_delivery_url(**kwargs):
453     """Update the delivery url for data router subscribers"""
454     dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
455             if s["type"] == "data_router"]
456
457     if dr_subs:
458         service_component_name = kwargs[SERVICE_COMPONENT_NAME]
459         # TODO: Should NOT be setting up the delivery url with ip addresses
460         # because in the https case, this will not work because data router does
461         # a certificate validation using the fqdn.
462         subscriber_host = _lookup_service(service_component_name, with_port=True)
463
464         for dr_sub in dr_subs:
465             scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
466             if "route" not in dr_sub:
467                 raise NonRecoverableError("'route' key missing from data router subscriber")
468             path = dr_sub["route"]
469             dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
470                     scheme=scheme, host=subscriber_host, path=path)
471             kwargs[dr_sub["name"]] = dr_sub
472
473     return kwargs
474
475 @wrap_error_handling_start
476 @merge_inputs_for_start
477 @monkeypatch_loggers
478 @operation
479 def create_and_start_container_for_components_with_streams(**start_inputs):
480     """Initiate Kubernetes deployment for service components that have streams
481
482     This operation method is to be used with the ContainerizedServiceComponentUsingDmaap
483     node type. After initiating the Kubernetes deployment, the plugin will verify with
484     Kubernetes that the app is up and responding successfully to readiness probes.
485     """
486     _done_for_start(
487             **_update_delivery_url(
488                 **_verify_component(
489                     **_create_and_start_component(
490                         **_enhance_docker_params(
491                             **_parse_cloudify_context(**start_inputs))))))
492
493
494 @wrap_error_handling_start
495 @monkeypatch_loggers
496 @operation
497 def create_and_start_container_for_platforms(**kwargs):
498     """Initiate Kubernetes deployment for platform components
499
500     This operation method is to be used with the ContainerizedPlatformComponent
501     node type.
502     """
503     # Capture node properties
504     image = ctx.node.properties["image"]
505     docker_config = ctx.node.properties.get("docker_config", {})
506     resource_config = ctx.node.properties.get("resource_config", {})
507     kwargs["resource_config"] = resource_config
508     if "healthcheck" in docker_config:
509         kwargs["readiness"] = docker_config["healthcheck"]
510     if "dns_name" in ctx.node.properties:
511         service_component_name = ctx.node.properties["dns_name"]
512     else:
513         service_component_name = ctx.node.properties["name"]
514
515     # Set some labels for the Kubernetes pods
516     kwargs["labels"] = {
517         "cfydeployment" : ctx.deployment.id,
518         "cfynode": ctx.node.name,
519         "cfynodeinstance": ctx.instance.id
520     }
521
522     host_port = ctx.node.properties["host_port"]
523     container_port = ctx.node.properties["container_port"]
524
525     # Cloudify properties are all required and Cloudify complains that None
526     # is not a valid type for integer. Defaulting to 0 to indicate to not
527     # use this and not to set a specific port mapping in cases like service
528     # change handler.
529     if container_port != 0:
530         # Doing this because other nodes might want to use this property
531         port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
532         ports = kwargs.get("ports", []) + [ port_mapping ]
533         kwargs["ports"] = ports
534     if "ports" not in kwargs:
535         ctx.logger.warn("No port mappings defined. Will randomly assign port.")
536
537     # All of the new node properties could be handled more DRYly!
538     # If a registration to MSB is required, then set up the registration info
539     if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
540         kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
541
542     # If centralized logging via ELK is desired, then set up the logging info
543     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
544         kwargs["log_info"] = ctx.node.properties["log_info"]
545
546     # Pick up TLS info if present
547     if "tls_info" in ctx.node.properties:
548         kwargs["tls_info"] = ctx.node.properties["tls_info"]
549
550     # Pick up replica count and always_pull_image flag
551     if "replicas" in ctx.node.properties:
552         kwargs["replicas"] = ctx.node.properties["replicas"]
553     if "always_pull_image" in ctx.node.properties:
554         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
555     _create_and_start_container(service_component_name, image, **kwargs)
556
557     # Verify that the k8s deployment is ready
558
559     max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
560     ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
561
562     if _verify_k8s_deployment(service_component_name, max_wait):
563         ctx.logger.info("k8s deployment ready for: {0}".format(service_component_name))
564
565
566 @wrap_error_handling_start
567 @monkeypatch_loggers
568 @operation
569 def create_and_start_container(**kwargs):
570     """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
571     service_component_name = ctx.node.properties["name"]
572     ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
573
574     image = ctx.node.properties["image"]
575
576     _create_and_start_container(service_component_name, image,**kwargs)
577
578 @monkeypatch_loggers
579 @operation
580 def stop_and_remove_container(**kwargs):
581     """Delete Kubernetes deployment"""
582     if K8S_DEPLOYMENT in ctx.instance.runtime_properties:
583         try:
584             deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
585             k8sclient.undeploy(deployment_description)
586
587         except Exception as e:
588             ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
589                     .format(str(e)))
590     else:
591         # A previous install workflow may have failed,
592         # and no Kubernetes deployment info was recorded in runtime_properties.
593         # No need to run the undeploy operation
594         ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment")
595
596 @wrap_error_handling_update
597 @monkeypatch_loggers
598 @operation
599 def scale(replicas, **kwargs):
600     """Change number of replicas in the deployment"""
601     service_component_name = ctx.instance.runtime_properties["service_component_name"]
602
603     if replicas > 0:
604         current_replicas = ctx.instance.runtime_properties["replicas"]
605         ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
606         deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
607         k8sclient.scale(deployment_description, replicas)
608         ctx.instance.runtime_properties["replicas"] = replicas
609
610         # Verify that the scaling took place as expected
611         max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
612         ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
613         if _verify_k8s_deployment(service_component_name, max_wait):
614             ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
615
616     else:
617         ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
618
619 @wrap_error_handling_update
620 @monkeypatch_loggers
621 @operation
622 def update_image(image, **kwargs):
623     """ Restart component with a new Docker image """
624
625     service_component_name = ctx.instance.runtime_properties["service_component_name"]
626     if image:
627         current_image = ctx.instance.runtime_properties["image"]
628         ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
629         deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
630         k8sclient.upgrade(deployment_description, image)
631         ctx.instance.runtime_properties["image"] = image
632
633         # Verify that the update took place as expected
634         max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
635         ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
636         if _verify_k8s_deployment(service_component_name, max_wait):
637             ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
638
639     else:
640         ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
641
642 #TODO: implement rollback operation when kubernetes python client fix is available.
643 # (See comments in k8sclient.py.)
644 # In the meantime, it's possible to undo an update_image operation by doing a second
645 # update_image that specifies the older image.
646
647 @monkeypatch_loggers
648 @Policies.cleanup_policies_on_node
649 @operation
650 def cleanup_discovery(**kwargs):
651     """Delete configuration from Consul"""
652     if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties:
653         service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
654
655         try:
656             conn = dis.create_kv_conn(CONSUL_HOST)
657             dis.remove_service_component_config(conn, service_component_name)
658         except dis.DiscoveryConnectionError as e:
659             raise RecoverableError(e)
660     else:
661         # When another node in the blueprint fails install,
662         # this node may not have generated a service component name.
663         # There's nothing to delete from Consul.
664         ctx.logger.info ("No service_component_name, not attempting to delete config from Consul")
665
666 def _notify_container(**kwargs):
667     """
668     Notify container using the policy section in the docker_config.
669     Notification consists of running a script in the application container
670     in each pod in the Kubernetes deployment associated with this node.
671     Return the list of notification results.
672     """
673     dc = kwargs["docker_config"]
674     resp = []
675
676     if "policy" in dc:
677         if dc["policy"]["trigger_type"] == "docker":
678
679              # Build the command to execute in the container
680              # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
681             script_path = dc["policy"]["script_path"]
682             policy_data = {
683                 "policies": kwargs["policies"],
684                 "updated_policies": kwargs["updated_policies"],
685                 "removed_policies": kwargs["removed_policies"]
686             }
687
688             command = [script_path, "policies", json.dumps(policy_data)]
689
690             # Execute the command
691             deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
692             resp = k8sclient.execute_command_in_deployment(deployment_description, command)
693
694     # else the default is no trigger
695
696     return resp
697
698 @operation
699 @monkeypatch_loggers
700 @Policies.update_policies_on_node()
701 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
702     """Policy update task
703
704     This method is responsible for updating the application configuration and
705     notifying the applications that the change has occurred. This is to be used
706     for the dcae.interfaces.policy.policy_update operation.
707
708     :updated_policies: contains the list of changed policy-configs when configs_only=True
709         (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
710     """
711     service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
712     ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
713         .format(service_component_name, updated_policies, removed_policies, policies))
714     update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
715     update_inputs["updated_policies"] = updated_policies
716     update_inputs["removed_policies"] = removed_policies
717     update_inputs["policies"] = policies
718
719     resp = _notify_container(**update_inputs)
720     ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))