1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 # Lifecycle interface calls for containerized components
23 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
24 import cloudify_importer
28 from cloudify import ctx
29 from cloudify.decorators import operation
30 from cloudify.exceptions import NonRecoverableError, RecoverableError
31 from onap_dcae_dcaepolicy_lib import Policies
32 from k8splugin import discovery as dis
33 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
34 merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
35 from k8splugin.exceptions import DockerPluginDeploymentError
36 from k8splugin import utils
37 from configure import configure
41 plugin_conf = configure.configure()
42 CONSUL_HOST = plugin_conf.get("consul_host")
43 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
44 DCAE_NAMESPACE = plugin_conf.get("namespace")
46 # Used to construct delivery urls for data router subscribers. Data router in FTL
47 # requires https but this author believes that ONAP is to be defaulted to http.
48 DEFAULT_SCHEME = "http"
51 SERVICE_COMPONENT_NAME = "service_component_name"
52 CONTAINER_ID = "container_id"
53 APPLICATION_CONFIG = "application_config"
54 K8S_DEPLOYMENT = "k8s_deployment"
58 # Lifecycle interface calls for dcae.nodes.DockerContainer
60 def _setup_for_discovery(**kwargs):
61 """Setup for config discovery"""
64 application_config = kwargs[APPLICATION_CONFIG]
66 # NOTE: application_config is no longer a json string and is inputed as a
67 # YAML map which translates to a dict. We don't have to do any
68 # preprocessing anymore.
69 conn = dis.create_kv_conn(CONSUL_HOST)
70 dis.push_service_component_config(conn, name, application_config)
72 except dis.DiscoveryConnectionError as e:
73 raise RecoverableError(e)
74 except Exception as e:
75 ctx.logger.error("Unexpected error while pushing configuration: {0}"
77 raise NonRecoverableError(e)
79 def _generate_component_name(**kwargs):
80 """Generate component name"""
81 service_component_type = kwargs['service_component_type']
82 name_override = kwargs['service_component_name_override']
84 kwargs['name'] = name_override if name_override \
85 else dis.generate_service_component_name(service_component_type)
88 def _done_for_create(**kwargs):
89 """Wrap up create operation"""
91 kwargs[SERVICE_COMPONENT_NAME] = name
92 # All updates to the runtime_properties happens here. I don't see a reason
93 # why we shouldn't do this because the context is not being mutated by
94 # something else and will keep the other functions pure (pure in the sense
95 # not dealing with CloudifyContext).
96 ctx.instance.runtime_properties.update(kwargs)
97 ctx.logger.info("Done setting up: {0}".format(name))
101 @merge_inputs_for_create
103 @Policies.gather_policies_to_node()
105 def create_for_components(**create_inputs):
106 """Create step for service components
108 This interface is responsible for:
110 1. Generating service component name
111 2. Populating config information into Consul
114 **_setup_for_discovery(
115 **_generate_component_name(
119 def _parse_streams(**kwargs):
120 """Parse streams and setup for DMaaP plugin"""
121 # The DMaaP plugin requires this plugin to set the runtime properties
122 # keyed by the node name.
123 def setup_publishes(s):
124 kwargs[s["name"]] = s
126 map(setup_publishes, kwargs["streams_publishes"])
128 def setup_subscribes(s):
129 if s["type"] == "data_router":
130 # If username and password has been provided then generate it. The
131 # DMaaP plugin doesn't generate for subscribers. The generation code
132 # and length of username password has been lifted from the DMaaP
135 # Don't want to mutate the source
137 if not s.get("username", None):
138 s["username"] = utils.random_string(8)
139 if not s.get("password", None):
140 s["password"] = utils.random_string(10)
142 kwargs[s["name"]] = s
144 # NOTE: That the delivery url is constructed and setup in the start operation
145 map(setup_subscribes, kwargs["streams_subscribes"])
149 def _setup_for_discovery_streams(**kwargs):
150 """Setup for discovery of streams
152 Specifically, there's a race condition this call addresses for data router
153 subscriber case. The component needs its feed subscriber information but the
154 DMaaP plugin doesn't provide this until after the docker plugin start
157 dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
158 if s["type"] == "data_router"]
161 dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
162 conn = dis.create_kv_conn(CONSUL_HOST)
164 def add_feed(dr_sub):
165 # delivery url and subscriber id will be fill by the dmaap plugin later
166 v = { "location": dr_sub["location"], "delivery_url": None,
167 "username": dr_sub["username"], "password": dr_sub["password"],
168 "subscriber_id": None }
169 return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
172 if all(map(add_feed, dr_subs)):
174 except Exception as e:
175 raise NonRecoverableError(e)
177 # You should never get here
178 raise NonRecoverableError("Failure updating feed streams in Consul")
183 @merge_inputs_for_create
185 @Policies.gather_policies_to_node()
187 def create_for_components_with_streams(**create_inputs):
188 """Create step for service components that use DMaaP
190 This interface is responsible for:
192 1. Generating service component name
193 2. Setup runtime properties for DMaaP plugin
194 3. Populating application config into Consul
195 4. Populating DMaaP config for data router subscribers in Consul
198 **_setup_for_discovery(
199 **_setup_for_discovery_streams(
201 **_generate_component_name(
205 @merge_inputs_for_create
208 def create_for_platforms(**create_inputs):
209 """Create step for platform components
211 This interface is responible for:
213 1. Populating config information into Consul
216 **_setup_for_discovery(
220 def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
222 conn = dis.create_kv_conn(consul_host)
223 results = dis.lookup_service(conn, service_component_name)
228 return "{address}:{port}".format(address=result["ServiceAddress"],
229 port=result["ServicePort"])
231 return results[0]["ServiceAddress"]
234 def _verify_k8s_deployment(service_component_name, max_wait):
235 """Verify that the k8s Deployment is ready
239 max_wait (integer): limit to how may attempts to make which translates to
240 seconds because each sleep is one second. 0 means infinite.
244 True if deployment is ready else a DockerPluginDeploymentError exception
250 if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
255 if max_wait > 0 and max_wait < num_attempts:
256 raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
262 def _create_and_start_container(container_name, image, **kwargs):
264 This will create a k8s Deployment and, if needed, a k8s Service or two.
265 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
266 We're not exposing k8s to the component developer and the blueprint author.
267 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
268 the details from the component developer and the blueprint author.)
271 - volumes: array of volume objects, where a volume object is:
272 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
273 - ports: array of strings in the form "container_port:host_port"
274 - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
275 - always_pull: boolean. If true, sets image pull policy to "Always"
276 so that a fresh copy of the image is always pull. Otherwise, sets
277 image pull policy to "IfNotPresent"
278 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
279 - log_info: an object with info for setting up ELK logging, with the form:
280 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
281 - replicas: number of replicas to be launched initially
282 - readiness: object with information needed to create a readiness check
284 env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
285 "CONFIG_BINDING_SERVICE": "config-binding-service" }
286 env.update(kwargs.get("envs", {}))
287 ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
288 ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
289 replicas = kwargs.get("replicas", 1)
290 _,dep = k8sclient.deploy(DCAE_NAMESPACE,
294 always_pull=kwargs.get("always_pull_image", False),
295 k8sconfig=plugin_conf,
296 volumes=kwargs.get("volumes",[]),
297 ports=kwargs.get("ports",[]),
298 msb_list=kwargs.get("msb_list"),
299 tls_info=kwargs.get("tls_info"),
301 labels = kwargs.get("labels", {}),
302 log_info=kwargs.get("log_info"),
303 readiness=kwargs.get("readiness"))
305 # Capture the result of deployment for future use
306 ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
307 ctx.instance.runtime_properties["replicas"] = replicas
308 ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
310 def _parse_cloudify_context(**kwargs):
311 """Parse Cloudify context
313 Extract what is needed. This is impure function because it requires ctx.
315 kwargs["deployment_id"] = ctx.deployment.id
317 # Set some labels for the Kubernetes pods
319 "cfydeployment" : ctx.deployment.id,
320 "cfynode": ctx.node.name,
321 "cfynodeinstance": ctx.instance.id
324 # Pick up the centralized logging info
325 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
326 kwargs["log_info"] = ctx.node.properties["log_info"]
328 # Pick up TLS info if present
329 if "tls_info" in ctx.node.properties:
330 kwargs["tls_info"] = ctx.node.properties["tls_info"]
332 # Pick up replica count and always_pull_image flag
333 if "replicas" in ctx.node.properties:
334 kwargs["replicas"] = ctx.node.properties["replicas"]
335 if "always_pull_image" in ctx.node.properties:
336 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
340 def _enhance_docker_params(**kwargs):
342 Set up Docker environment variables and readiness check info
343 and inject into kwargs.
346 # Get info for setting up readiness probe, if present
347 docker_config = kwargs.get("docker_config", {})
348 if "healthcheck" in docker_config:
349 kwargs["readiness"] = docker_config["healthcheck"]
351 envs = kwargs.get("envs", {})
353 # Set tags on this component for its Consul registration as a service
354 tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
355 tags = [ str(tag) for tag in tags if tag is not None ]
356 # Registrator will use this to register this component with tags. Must be
358 envs["SERVICE_TAGS"] = ",".join(tags)
360 kwargs["envs"] = envs
362 def combine_params(key, docker_config, kwargs):
363 v = docker_config.get(key, []) + kwargs.get(key, [])
368 # Add the lists of ports and volumes unintelligently - meaning just add the
369 # lists together with no deduping.
370 kwargs = combine_params("ports", docker_config, kwargs)
371 kwargs = combine_params("volumes", docker_config, kwargs)
376 def _create_and_start_component(**kwargs):
377 """Create and start component (container)"""
378 image = kwargs["image"]
379 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
380 # Need to be picky and manually select out pieces because just using kwargs
381 # which contains everything confused the execution of
382 # _create_and_start_container because duplicate variables exist
384 "volumes": kwargs.get("volumes", []),
385 "ports": kwargs.get("ports", None),
386 "envs": kwargs.get("envs", {}),
387 "log_info": kwargs.get("log_info", {}),
388 "tls_info": kwargs.get("tls_info", {}),
389 "labels": kwargs.get("labels", {}),
390 "readiness": kwargs.get("readiness",{})}
391 _create_and_start_container(service_component_name, image, **sub_kwargs)
395 def _verify_component(**kwargs):
396 """Verify deployment is ready"""
397 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
399 max_wait = kwargs.get("max_wait", 300)
400 ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
402 if _verify_k8s_deployment(service_component_name, max_wait):
403 ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
407 def _done_for_start(**kwargs):
408 ctx.instance.runtime_properties.update(kwargs)
409 ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
412 def _setup_msb_registration(service_name, msb_reg):
414 "serviceName" : service_name,
415 "port" : msb_reg.get("port", "80"),
416 "version" : msb_reg.get("version", "v1"),
417 "url" : msb_reg.get("url_path", "/v1"),
419 "enable_ssl" : msb_reg.get("uses_ssl", False),
423 @wrap_error_handling_start
424 @merge_inputs_for_start
427 def create_and_start_container_for_components(**start_inputs):
428 """Initiate Kubernetes deployment for service components
430 This operation method is to be used with the ContainerizedServiceComponent
431 node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
432 that the app is up and responding successfully to readiness probes.
436 **_create_and_start_component(
437 **_enhance_docker_params(
438 **_parse_cloudify_context(**start_inputs)))))
441 def _update_delivery_url(**kwargs):
442 """Update the delivery url for data router subscribers"""
443 dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
444 if s["type"] == "data_router"]
447 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
448 # TODO: Should NOT be setting up the delivery url with ip addresses
449 # because in the https case, this will not work because data router does
450 # a certificate validation using the fqdn.
451 subscriber_host = _lookup_service(service_component_name, with_port=True)
453 for dr_sub in dr_subs:
454 scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
455 if "route" not in dr_sub:
456 raise NonRecoverableError("'route' key missing from data router subscriber")
457 path = dr_sub["route"]
458 dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
459 scheme=scheme, host=subscriber_host, path=path)
460 kwargs[dr_sub["name"]] = dr_sub
464 @wrap_error_handling_start
465 @merge_inputs_for_start
468 def create_and_start_container_for_components_with_streams(**start_inputs):
469 """Initiate Kubernetes deployment for service components that have streams
471 This operation method is to be used with the ContainerizedServiceComponentUsingDmaap
472 node type. After initiating the Kubernetes deployment, the plugin will verify with
473 Kubernetes that the app is up and responding successfully to readiness probes.
476 **_update_delivery_url(
478 **_create_and_start_component(
479 **_enhance_docker_params(
480 **_parse_cloudify_context(**start_inputs))))))
483 @wrap_error_handling_start
486 def create_and_start_container_for_platforms(**kwargs):
487 """Initiate Kubernetes deployment for platform components
489 This operation method is to be used with the ContainerizedPlatformComponent
492 # Capture node properties
493 image = ctx.node.properties["image"]
494 docker_config = ctx.node.properties.get("docker_config", {})
495 if "healthcheck" in docker_config:
496 kwargs["readiness"] = docker_config["healthcheck"]
497 if "dns_name" in ctx.node.properties:
498 service_component_name = ctx.node.properties["dns_name"]
500 service_component_name = ctx.node.properties["name"]
502 # Set some labels for the Kubernetes pods
504 "cfydeployment" : ctx.deployment.id,
505 "cfynode": ctx.node.name,
506 "cfynodeinstance": ctx.instance.id
509 host_port = ctx.node.properties["host_port"]
510 container_port = ctx.node.properties["container_port"]
512 # Cloudify properties are all required and Cloudify complains that None
513 # is not a valid type for integer. Defaulting to 0 to indicate to not
514 # use this and not to set a specific port mapping in cases like service
516 if container_port != 0:
517 # Doing this because other nodes might want to use this property
518 port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
519 ports = kwargs.get("ports", []) + [ port_mapping ]
520 kwargs["ports"] = ports
521 if "ports" not in kwargs:
522 ctx.logger.warn("No port mappings defined. Will randomly assign port.")
524 # All of the new node properties could be handled more DRYly!
525 # If a registration to MSB is required, then set up the registration info
526 if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
527 kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
529 # If centralized logging via ELK is desired, then set up the logging info
530 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
531 kwargs["log_info"] = ctx.node.properties["log_info"]
533 # Pick up TLS info if present
534 if "tls_info" in ctx.node.properties:
535 kwargs["tls_info"] = ctx.node.properties["tls_info"]
537 # Pick up replica count and always_pull_image flag
538 if "replicas" in ctx.node.properties:
539 kwargs["replicas"] = ctx.node.properties["replicas"]
540 if "always_pull_image" in ctx.node.properties:
541 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
542 _create_and_start_container(service_component_name, image, **kwargs)
544 # Verify that the k8s deployment is ready
546 max_wait = kwargs.get("max_wait", 300)
547 ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
549 if _verify_k8s_deployment(service_component_name, max_wait):
550 ctx.logger.info("k8s deployment ready for: {0}".format(service_component_name))
553 @wrap_error_handling_start
556 def create_and_start_container(**kwargs):
557 """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
558 service_component_name = ctx.node.properties["name"]
559 ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
561 image = ctx.node.properties["image"]
563 _create_and_start_container(service_component_name, image,**kwargs)
567 def stop_and_remove_container(**kwargs):
568 """Delete Kubernetes deployment"""
569 if K8S_DEPLOYMENT in ctx.instance.runtime_properties:
571 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
572 k8sclient.undeploy(deployment_description)
574 except Exception as e:
575 ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
578 # A previous install workflow may have failed,
579 # and no Kubernetes deployment info was recorded in runtime_properties.
580 # No need to run the undeploy operation
581 ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment")
583 @wrap_error_handling_update
586 def scale(replicas, **kwargs):
587 """Change number of replicas in the deployment"""
588 service_component_name = ctx.instance.runtime_properties["service_component_name"]
591 current_replicas = ctx.instance.runtime_properties["replicas"]
592 ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
593 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
594 k8sclient.scale(deployment_description, replicas)
595 ctx.instance.runtime_properties["replicas"] = replicas
597 # Verify that the scaling took place as expected
598 max_wait = kwargs.get("max_wait", 300)
599 ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
600 if _verify_k8s_deployment(service_component_name, max_wait):
601 ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
604 ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
606 @wrap_error_handling_update
609 def update_image(image, **kwargs):
610 """ Restart component with a new Docker image """
612 service_component_name = ctx.instance.runtime_properties["service_component_name"]
614 current_image = ctx.instance.runtime_properties["image"]
615 ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
616 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
617 k8sclient.upgrade(deployment_description, image)
618 ctx.instance.runtime_properties["image"] = image
620 # Verify that the update took place as expected
621 max_wait = kwargs.get("max_wait", 300)
622 ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
623 if _verify_k8s_deployment(service_component_name, max_wait):
624 ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
627 ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
629 #TODO: implement rollback operation when kubernetes python client fix is available.
630 # (See comments in k8sclient.py.)
631 # In the meantime, it's possible to undo an update_image operation by doing a second
632 # update_image that specifies the older image.
635 @Policies.cleanup_policies_on_node
637 def cleanup_discovery(**kwargs):
638 """Delete configuration from Consul"""
639 if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties:
640 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
643 conn = dis.create_kv_conn(CONSUL_HOST)
644 dis.remove_service_component_config(conn, service_component_name)
645 except dis.DiscoveryConnectionError as e:
646 raise RecoverableError(e)
648 # When another node in the blueprint fails install,
649 # this node may not have generated a service component name.
650 # There's nothing to delete from Consul.
651 ctx.logger.info ("No service_component_name, not attempting to delete config from Consul")
653 def _notify_container(**kwargs):
655 Notify container using the policy section in the docker_config.
656 Notification consists of running a script in the application container
657 in each pod in the Kubernetes deployment associated with this node.
658 Return the list of notification results.
660 dc = kwargs["docker_config"]
664 if dc["policy"]["trigger_type"] == "docker":
666 # Build the command to execute in the container
667 # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
668 script_path = dc["policy"]["script_path"]
670 "policies": kwargs["policies"],
671 "updated_policies": kwargs["updated_policies"],
672 "removed_policies": kwargs["removed_policies"]
675 command = [script_path, "policies", json.dumps(policy_data)]
677 # Execute the command
678 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
679 resp = k8sclient.execute_command_in_deployment(deployment_description, command)
681 # else the default is no trigger
687 @Policies.update_policies_on_node()
688 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
689 """Policy update task
691 This method is responsible for updating the application configuration and
692 notifying the applications that the change has occurred. This is to be used
693 for the dcae.interfaces.policy.policy_update operation.
695 :updated_policies: contains the list of changed policy-configs when configs_only=True
696 (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
698 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
699 ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
700 .format(service_component_name, updated_policies, removed_policies, policies))
701 update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
702 update_inputs["updated_policies"] = updated_policies
703 update_inputs["removed_policies"] = removed_policies
704 update_inputs["policies"] = policies
706 resp = _notify_container(**update_inputs)
707 ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))