DMaaP add support for useExisting 67/91767/1
authorJack Lucas <jflucas@research.att.com>
Fri, 19 Jul 2019 13:10:01 +0000 (09:10 -0400)
committerJack Lucas <jflucas@research.att.com>
Fri, 19 Jul 2019 19:26:46 +0000 (15:26 -0400)
Issue-ID: DCAEGEN2-1670
Signed-off-by: Jack Lucas <jflucas@research.att.com>
Change-Id: I0a945117b7f4084191245ea56af2712b2cd20d4c

.gitignore
dmaap/dmaap.yaml
dmaap/dmaapcontrollerif/dmaap_requests.py
dmaap/dmaapplugin/dr_lifecycle.py
dmaap/dmaapplugin/mr_lifecycle.py
dmaap/setup.py

index 71d606c..1c61431 100644 (file)
@@ -12,3 +12,4 @@ helm/nosetests.xml
 **/*.wgn
 **/*.egg-info
 **/.testenv
+.vscode/*
index ed56cf7..4a47a7f 100644 (file)
@@ -25,7 +25,7 @@ plugins:
   dmaapplugin:
     executor: 'central_deployment_agent'
     package_name: dmaap
-    package_version: 1.3.4
+    package_version: 1.3.5
 
 
 node_types:
@@ -47,6 +47,9 @@ node_types:
       aspr_classification:
         type: string
         required: false
+      useExisting:
+        type: boolean
+        required: false
 
     interfaces:
       cloudify.interfaces.lifecycle:
@@ -115,6 +118,9 @@ node_types:
       global_mr_url:
         type: string
         required: false
+      useExisting:
+        type: boolean
+        required: false
 
     interfaces:
       cloudify.interfaces.lifecycle:
index 813a1d8..231953a 100644 (file)
@@ -89,11 +89,11 @@ class DMaaPControllerHandle(object):
     ### PUBLIC API ###
 
     # Data Router Feeds
-    def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None):
+    def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None, useExisting=None):
         '''
         Create a DMaaP data router feed with the given feed name
         and (optionally) feed version, feed description, ASPR classification,
-        and owner
+        owner, and useExisting flag
         '''
         feed_definition = {'feedName' : name}
         if version:
@@ -104,8 +104,11 @@ class DMaaPControllerHandle(object):
             feed_definition['asprClassification'] = aspr_class
         if owner:
             feed_definition['owner'] = owner
+        feeds_path_query = self.feeds_path
+        if useExisting == True:                         # It's a boolean!
+            feeds_path_query += "?useExisting=true"
 
-        return self._create_resource(self.feeds_path, feed_definition)
+        return self._create_resource(feeds_path_query, feed_definition)
 
     def get_feed_info(self, feed_id):
         '''
@@ -197,12 +200,12 @@ class DMaaPControllerHandle(object):
         return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id))
 
     # Message router topics
-    def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None):
+    def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None, useExisting = None):
         '''
         Create a message router topic with the topic name 'name' and optionally the topic_description
-        'description', the 'txenable' flag and the topic owner 'owner'.
+        'description', the 'txenable' flag, the 'useExisting' flag and the topic owner 'owner'.
         '''
-        topic_definition = {'topicName' : name};
+        topic_definition = {'topicName' : name}
         if description:
             topic_definition['topicDescription'] = description
         if owner:
@@ -213,8 +216,11 @@ class DMaaPControllerHandle(object):
             topic_definition['replicationCase'] = replication_case
         if global_mr_url:
             topic_definition['globalMrURL'] = global_mr_url
+        topics_path_query = self.topics_path
+        if useExisting == True:                         # It's a boolean!
+            topics_path_query += "?useExisting=true"
 
-        return self._create_resource(self.topics_path, topic_definition)
+        return self._create_resource(topics_path_query, topic_definition)
 
     def get_topic_info(self, fqtn):
         '''
index 29811fa..718158a 100644 (file)
@@ -31,9 +31,10 @@ def create_feed(**kwargs):
     Create a new data router feed
         Expects "feed_name" to be set in node properties
         If 'feed_name' is not set or is empty, generates a random one.
-        Allows "feed_version", "feed_description", and "aspr_classification" as optional properties
+        Allows "feed_version", "feed_description", "aspr_classification" and "useExisting" as optional properties
         (Sets default values if not provided )
         Sets instance runtime properties:
+        Note that 'useExisting' is a flag indicating whether DBCL will use existing feed if the feed already exists.
             - "feed_id"
             - "publish_url"
             - "log_url"
@@ -58,11 +59,15 @@ def create_feed(**kwargs):
             aspr_classification = ctx.node.properties["aspr_classification"]
         else:
             aspr_classification = "unclassified"
+        if "useExisting" in ctx.node.properties.keys():
+            useExisting = ctx.node.properties["useExisting"]
+        else:
+            useExisting = False
 
         # Make the request to the controller
         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
         ctx.logger.info("Attempting to create feed name {0}".format(feed_name))
-        f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER)
+        f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER, useExisting)
         f.raise_for_status()
 
         # Capture important properties from the result
@@ -102,11 +107,11 @@ def get_existing_feed(**kwargs):
             if f is None:
                 ctx.logger.error("Not find existing feed with feed name {0}".format(feed_name))
                 raise ValueError("Not find existing feed with feed name " + feed_name)
-        else: 
+        else:
             raise ValueError("Either feed_id or feed_name must be defined to get existing feed")
 
         f.raise_for_status()
-        
+
         # Capture important properties from the result
         feed = f.json()
         feed_id = feed["feedId"]
@@ -118,17 +123,17 @@ def get_existing_feed(**kwargs):
         else:
             ctx.logger.info("Found existing feed with feed name {0}".format(ctx.node.properties["feed_name"]))
 
-    except ValueError as e:                            
-        ctx.logger.error("{er}".format(er=e)) 
+    except ValueError as e:
+        ctx.logger.error("{er}".format(er=e))
         raise NonRecoverableError(e)
-    except Exception as e:                            
+    except Exception as e:
         if feed_id_input:
-            ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e)) 
+            ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
         else:
-            ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e)) 
+            ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e))
         raise NonRecoverableError(e)
 
-    
+
 @operation
 def delete_feed(**kwargs):
     '''
index a4f04ec..ec674de 100644 (file)
@@ -28,12 +28,14 @@ from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
 def create_topic(**kwargs):
     '''
     Creates a message router topic.
-    Allows 'topic_name', 'topic_description', 'txenable', 'replication_case',
-    and 'global_mr_url'  as optional node properties.  If 'topic_name' is not set,
+    Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', 'global_mr_url',
+    and 'useExisting' as optional node properties.  If 'topic_name' is not set,
     generates a random one.
     Sets 'fqtn' in the instance runtime_properties.
     Note that 'txenable' is a Message Router flag indicating whether transactions
     are enabled on the topic.
+    Note that 'useExisting' is a flag indicating whether DBCL will use existing topic if
+    the topic already exists.
     '''
     try:
         # Make sure there's a topic_name
@@ -66,11 +68,15 @@ def create_topic(**kwargs):
         else:
             global_mr_url = None
 
+        if "useExisting" in ctx.node.properties:
+            useExisting = ctx.node.properties["useExisting"]
+        else:
+            useExisting = False
 
         # Make the request to the controller
         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
         ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
-        t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url)
+        t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url, useExisting)
         t.raise_for_status()
 
         # Capture important properties from the result
index 955f181..c423d95 100644 (file)
@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
 
 setup(
     name = "dmaap",
-    version = "1.3.4",
+    version = "1.3.5",
     packages=find_packages(),
     author = "AT&T",
     description = ("Cloudify plugin for creating DMaaP feeds and topics, and setting up publishers and subscribers."),