Combine all task inputs for create and start 51/12051/1
authorMichael Hwang <mhwang@research.att.com>
Tue, 12 Sep 2017 21:28:37 +0000 (17:28 -0400)
committerMichael Hwang <mhwang@research.att.com>
Tue, 12 Sep 2017 21:30:38 +0000 (17:30 -0400)
Change-Id: Ia86d3b26b5ecccd636fb171b3967f924b0cb1250
Issue-Id: DCAEGEN2-91
Signed-off-by: Michael Hwang <mhwang@research.att.com>
docker/dockerplugin/decorators.py
docker/dockerplugin/tasks.py
docker/dockerplugin/utils.py

index 089231a..f83263b 100644 (file)
@@ -25,6 +25,7 @@ from dockering import utils as doc
 from dockerplugin import discovery as dis
 from dockerplugin.exceptions import DockerPluginDeploymentError, \
     DockerPluginDependencyNotReadyError
+from dockerplugin import utils
 
 
 def monkeypatch_loggers(task_func):
@@ -62,19 +63,40 @@ def wrap_error_handling_start(task_start_func):
     return wrapper
 
 
-def merge_inputs_for_start(task_start_func):
+def _wrapper_merge_inputs(task_func, properties, **kwargs):
+    """Merge Cloudify properties with input kwargs before calling task func"""
+    inputs = copy.deepcopy(properties)
+    # Recursively update
+    utils.update_dict(inputs, kwargs)
+
+    # Apparently kwargs contains "ctx" which is cloudify.context.CloudifyContext
+    # This has to be removed and not copied into runtime_properties else you get
+    # JSON serialization errors.
+    if "ctx" in inputs:
+        del inputs["ctx"]
+
+    return task_func(**inputs)
+
+def merge_inputs_for_create(task_create_func):
     """Merge all inputs for start operation into one dict"""
 
-    def wrapper (**kwargs):
-        start_inputs = copy.deepcopy(ctx.instance.runtime_properties)
-        start_inputs.update(kwargs)
+    # Needed to wrap the wrapper because I was seeing issues with
+    # "RuntimeError: No context set in current execution thread"
+    def wrapper(**kwargs):
+        # NOTE: ctx.node.properties is an ImmutableProperties instance which is
+        # why it is passed into a mutable dict so that it can be deep copied
+        return _wrapper_merge_inputs(task_create_func,
+                dict(ctx.node.properties), **kwargs)
+
+    return wrapper
 
-        # Apparently kwargs contains "ctx" which is cloudify.context.CloudifyContext
-        # This has to be removed and not copied into runtime_properties else you get
-        # JSON serialization errors.
-        if "ctx" in start_inputs:
-            del start_inputs["ctx"]
+def merge_inputs_for_start(task_start_func):
+    """Merge all inputs for start operation into one dict"""
 
-        return task_start_func(**start_inputs)
+    # Needed to wrap the wrapper because I was seeing issues with
+    # "RuntimeError: No context set in current execution thread"
+    def wrapper(**kwargs):
+        return _wrapper_merge_inputs(task_start_func,
+                ctx.instance.runtime_properties, **kwargs)
 
     return wrapper
index e42e47d..9d33a8b 100644 (file)
@@ -28,7 +28,7 @@ import dockering as doc
 from dcaepolicy import Policies, POLICIES, POLICY_MESSAGE_TYPE
 from dockerplugin import discovery as dis
 from dockerplugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
-    merge_inputs_for_start
+    merge_inputs_for_start, merge_inputs_for_create
 from dockerplugin.exceptions import DockerPluginDeploymentError, \
     DockerPluginDependencyNotReadyError
 from dockerplugin import utils
@@ -119,10 +119,11 @@ def _merge_policy_updates(**kwargs):
     return kwargs
 
 
+@merge_inputs_for_create
 @monkeypatch_loggers
 @Policies.gather_policies_to_node
 @operation
-def create_for_components(**kwargs):
+def create_for_components(**create_inputs):
     """Create step for Docker containers that are components
 
     This interface is responible for:
@@ -134,7 +135,7 @@ def create_for_components(**kwargs):
             **_setup_for_discovery(
                 **_merge_policy_updates(
                     **_generate_component_name(
-                        **ctx.node.properties))))
+                        **create_inputs))))
 
 
 def _parse_streams(**kwargs):
@@ -201,10 +202,11 @@ def _setup_for_discovery_streams(**kwargs):
         return kwargs
 
 
+@merge_inputs_for_create
 @monkeypatch_loggers
 @Policies.gather_policies_to_node
 @operation
-def create_for_components_with_streams(**kwargs):
+def create_for_components_with_streams(**create_inputs):
     """Create step for Docker containers that are components that use DMaaP
 
     This interface is responible for:
@@ -220,12 +222,13 @@ def create_for_components_with_streams(**kwargs):
                     **_merge_policy_updates(
                         **_parse_streams(
                             **_generate_component_name(
-                                **ctx.node.properties))))))
+                                **create_inputs))))))
 
 
+@merge_inputs_for_create
 @monkeypatch_loggers
 @operation
-def create_for_platforms(**kwargs):
+def create_for_platforms(**create_inputs):
     """Create step for Docker containers that are platform components
 
     This interface is responible for:
@@ -234,7 +237,7 @@ def create_for_platforms(**kwargs):
     """
     _done_for_create(
             **_setup_for_discovery(
-                **ctx.node.properties))
+                **create_inputs))
 
 
 def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
index ed680c2..c45af68 100644 (file)
 
 import string
 import random
+import collections
 
 
 def random_string(n):
     """Random generate an ascii string of "n" length"""
     corpus = string.ascii_lowercase + string.ascii_uppercase + string.digits
     return ''.join(random.choice(corpus) for x in range(n))
+
+
+def update_dict(d, u):
+    """Recursively updates dict
+
+    Update dict d with dict u
+    """
+    for k, v in u.iteritems():
+        if isinstance(v, collections.Mapping):
+            r = update_dict(d.get(k, {}), v)
+            d[k] = r
+        else:
+            d[k] = u[k]
+    return d