Add seed code for DMaaP plugin 19/9919/1
authorJack Lucas <jflucas@research.att.com>
Fri, 1 Sep 2017 13:48:08 +0000 (13:48 +0000)
committerJack Lucas <jflucas@research.att.com>
Fri, 1 Sep 2017 13:50:18 +0000 (13:50 +0000)
Change-Id: I8c7a9c432badd3052a571ed87b9b580760b376e6
Issue-Id: CCSDK-65
Signed-off-by: Jack Lucas <jflucas@research.att.com>
19 files changed:
dmaap/.gitignore [new file with mode: 0644]
dmaap/LICENSE.txt [new file with mode: 0644]
dmaap/README.md [new file with mode: 0644]
dmaap/consulif/__init__.py [new file with mode: 0644]
dmaap/consulif/consulif.py [new file with mode: 0644]
dmaap/dmaap.yaml [new file with mode: 0644]
dmaap/dmaapcontrollerif/__init__.py [new file with mode: 0644]
dmaap/dmaapcontrollerif/dmaap_requests.py [new file with mode: 0644]
dmaap/dmaapplugin/__init__.py [new file with mode: 0644]
dmaap/dmaapplugin/dmaaputils.py [new file with mode: 0644]
dmaap/dmaapplugin/dr_bridge.py [new file with mode: 0644]
dmaap/dmaapplugin/dr_lifecycle.py [new file with mode: 0644]
dmaap/dmaapplugin/dr_relationships.py [new file with mode: 0644]
dmaap/dmaapplugin/mr_lifecycle.py [new file with mode: 0644]
dmaap/dmaapplugin/mr_relationships.py [new file with mode: 0644]
dmaap/requirements.txt [new file with mode: 0644]
dmaap/setup.py [new file with mode: 0644]
dmaap/tests/test_plugin.py [new file with mode: 0644]
dmaap/tox.ini [new file with mode: 0644]

diff --git a/dmaap/.gitignore b/dmaap/.gitignore
new file mode 100644 (file)
index 0000000..af8e550
--- /dev/null
@@ -0,0 +1,69 @@
+.project
+.pydevproject
+wheels
+venv
+cdap.zip
+docker.zip
+*.wgn
+*.swp
+*.swn
+*.swo
+.DS_Store
+
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*,cover
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
diff --git a/dmaap/LICENSE.txt b/dmaap/LICENSE.txt
new file mode 100644 (file)
index 0000000..f90f8f1
--- /dev/null
@@ -0,0 +1,17 @@
+============LICENSE_START=======================================================
+org.onap.ccsdk
+================================================================================
+Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+================================================================================
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+============LICENSE_END=========================================================
diff --git a/dmaap/README.md b/dmaap/README.md
new file mode 100644 (file)
index 0000000..646400f
--- /dev/null
@@ -0,0 +1,322 @@
+## Cloudify DMaaP Plugin
+Cloudify plugin for creating and managing DMaaP Data Router feeds and subscriptions and
+DMaaP Message Router topics.   The plugin uses the DMaaP Bus Controller API.
+
+### Plugin Support for DMaaP Data Router
+#### Plugin Types for DMaaP Data Router
+The Cloudify type definitions for DMaaP Data Router nodes and relationships
+are defined in [`dmaap.yaml`](./dmaap.yaml).
+
+There are four node types for DMaaP Data Router:
+
+- `ccsdk.nodes.Feed`: This type represents a feed that does not yet
+exist and that should be created when the install workflow is
+run against a blueprint that contains a node of this type.
+
+Property|Type|Required?|Description                           |
+--------|----|---------|---------------------------------------
+feed_name|string|no|a name that identifies the feed (plugin will generate if absent)
+feed_version|string|no|version number for the feed (feed_name + feed_version uniquely identify the feed in DR)
+feed_description|string|no|human-readable description of the feed
+aspr_classification|string|no|AT&T ASPR classification of the feed
+
+
+- `ccsdk.nodes.ExistingFeed`: This type represents a feed that
+already exists.  Nodes of this type are placed in a blueprint so
+that other nodes in the blueprint can be set up as publishers or
+subscribers to the feed. The table below shows the properties that a node
+of this type may have.
+
+Property|Type|Required?|Description
+--------|----|---------|----------------------------------------
+feed_id|string|yes|Feed identifier assigned by DMaaP when the feed was created
+
+- `ccsdk.nodes.ExternalTargetFeed`:  This type represents a feed created in an external DMaaP
+environment (i.e., an environment that the plugin cannot access to make provisioning requests, such as
+a shared corporate system).  Nodes of this type are placed in a blueprint so that other feed nodes of
+type `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed` can be set up to "bridge" to external feeds by
+publishing data to the external feeds.  The table below shows the properties that a node of this type
+may have.
+
+Property|Type|Required?|Description
+--------|----|---------|----------------------------------------
+url|string|yes|The publish URL of the external feed.
+username|string|yes|The username to be used when delivering to the external feed
+userpw|string|yes|The password to be used when delivering to the external feed
+
+_Note: These properties are usually obtained by manually creating a feed in the external
+DMaaP DR system and then creating a publisher for that feed._
+
+- `ccsdk.nodes.ExternalSourceFeed`:  This type represents a feed created in an external DMaaP
+environment (i.e., an environment that the plugin cannot access to makes provisioning requests, such as
+a shared corporate system).  Nodes of this type are place in a blueprint so that they can be set up to
+"bridge" to other feed nodes of type `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed`.  This type
+has no node properties, but when a bridge is set up, the url, username, and password are attached to the
+node as runtime_properties, using the name of the target feed node as the top-level key.
+
+There are five relationship types for DMaaP Data Router:
+
+- `ccsdk.relationships.publish_files`,  used to
+indicate that the relationship's source node sends is a publisher to the
+Data Router feed represented by the relationship's target node.
+- `ccsdk.relationships.subscribe_to_files`, used to
+indicate that the relationship's source node is a subscriber to the
+Data Router feed represented by the relationship's target node.
+- `ccsdk.relationships.bridges_to`, used to indicate that the relationship's source
+node (a `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed`) should be set up
+to forward data ("bridge") to the relationship's target feed (another `ccsdk.nodes.Feed` or
+`ccsdk.nodes.ExistingFeed`).
+- `ccsdk.relationships.bridges_to_external`, used to indicate that the relationship's source
+node (a `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed`) should be set up
+to forward data  ("bridge") to the relationship's target node (a feed in an external DMaaP system,
+represented by a `ccsdk.nodes.ExternalTargetFeed` node).
+- `ccsdk.relationships.bridges_from_external_to_internal`, used to indicate the the relationship's source
+node (a feed in an external DMaaP system, represented by a `ccsdk.nodes.ExternalSourceFeed` node) should be set up to forward date ("bridge")
+to the relationship's target node (an internal ONAP feed, represented by a `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed` node).
+
+The plugin code implements the lifecycle operations needed to create and
+delete feeds and to add and remove publishers and subscribers.  It also implements
+the operations needed to set up bridging between feeds.
+
+#### Interaction with Other Plugins
+When creating a new feed or processing a reference to an existing feed,
+the plugin operates independently of other plugins.
+
+When processing a `ccsdk.relationships.publish_files` relationship or a
+`ccsdk.relationships.subscribe_to_files` relationship, this plugin needs
+to obtain data from the source node and, in the case of `publish_files`, provide
+data to the source node.  Certain conventions are therefore needed for
+passing data between this plugin and the plugins responsible for the source
+nodes in these relationships.  In Cloudify, the mechanism for
+sharing data among plugins is the `ctx.instance.runtime_properties` dictionary
+associated with each node.
+
+A given source node may have relationships with several feeds.  For example, an ONAP DCAE
+data collector might publish two different types of data to two different feeds.  An ONAP DCAE
+analytics module might subscribe to one feed to get input for its processing and
+publish its results to a different feed.   When this DMaaP plugin and the plugin for the
+source node exchange information, they need to do in a way that lets them distinguish
+among different feeds.   We do this through a simple convention:  for each source node
+to feed relationship, the source node plugin will create a property in the source node's
+`runtime_properties` dictionary.  The name of the property will be the same as the
+name of the target node of the relationship.  For instance, if a node has a
+`publishes_files` relationship with a target node named `feed00`, then the plugin that's
+responsible for managing the source node with create an entry in the source node's
+`runtime_properties` dictionary named `feed00`.  This entry itself will be a dictionary.
+
+The content of this data exchange dictionary depends on whether the source node is a
+publisher (i.e., the relationship is `publish_files`) or a subscriber (i.e., the
+relationship is `subscribe_to_files`).
+
+For the `publish_files` relationship, the data exchange dictionary has the following
+properties:
+
+Property|Set by|Description
+--------|------|------------------------------------------------
+location|source node plugin|the DMaaP location for the publisher, used to set up routing
+publish_url|DMaaP plugin|the URL to which the publisher makes Data Router publish requests
+log_url|DMaaP plugin|the URL from which log data for the feed can be obtained
+username|DMaaP plugin|the username (generated by the DMaaP plugin) the publisher uses to authenticate to Data Router
+password|DMaaP plugin|the password (generated by the DMaaP plugin) the publisher uses to authenticate to Data Router
+
+For the `subscribe_to_files` relationship, the data exchange dictionary has the following
+properties:
+
+Property|Set by|Description
+--------|------|------------------------------------------------
+location|source node plugin|the DMaaP location for the subscriber, used to set up routing
+delivery_url|source node plugin|the URL to which the Data Router should deliver files
+username|source node plugin|the username Data Router uses to authenticate to the subscriber when delivering files
+password|source node plugin|the username Data Router uses to authenticate to the subscriber when delivering file
+
+### Plugin Support for DMaaP Message Router
+#### Plugin Types for DMaaP Message Router
+The Cloudify type definitions for DMaaP Message Router nodes and relationships
+are defined in [`dmaap.yaml`](./dmaap.yaml).
+
+There are two node types for DMaaP Message Router:
+
+- `ccsdk.nodes.Topic`: This type represents a topic that does not yet
+exist and that should be created when the install workflow is
+run against a blueprint that contains a node of this type.
+
+Property|Type|Required?|Description
+--------|----|---------|---------------------------------------
+topic_name|string|no|a name that uniquely identifies the feed (plugin will generate if absent)
+topic_description|string|no|human-readable description of the feed
+txenable|boolean|no|flag indicating whether transactions are enabled for this topic
+replication_case|string|no|type of replication required for the topic (defaults to no replication)
+global_mr_url|string|no|Global MR host name for replication to a global MR instance
+
+Note: In order to set up topics, a user should be familiar with message router and how it is configured,
+and this README is not the place to explain the details of message router. Here are a couple of pieces of
+information that might be helpful.
+Currently, the allowed values for `replication_case` are:
+
+- `REPLICATION_NONE`
+- `REPLICATION_EDGE_TO_CENTRAL`
+- `REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL`
+- `REPLICATION_CENTRAL_TO_EDGE`
+- `REPLICATION_CENTRAL_TO_GLOBAL`
+- `REPLICATION_GLOBAL_TO_CENTRAL`
+- `REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE`
+
+The `global_mr_url` is actually a host name, not a full URL.  It points to a host in a global message router
+cluster.  (A 'global' message router cluster is one that's not part of ONAP.)
+
+- `ccsdk.nodes.ExistingTopic`: This type represents a topic that
+already exists.  Nodes of this type are placed in a blueprint so
+that other nodes in the blueprint can be set up as publishers or
+subscribers to the topic. The table below shows the properties that a node
+of this type may have.
+
+Property|Type|Required?|Description
+--------|----|---------|----------------------------------------
+fqtn|string|yes|fully-qualified topic name for the topic
+
+#### Interaction with Other Plugins
+When creating a new topic or processing a reference to an existing topic,
+the plugin operates independently of other plugins.
+
+When processing a `ccsdk.relationships.publish_events` relationship or a
+`ccsdk.relationships.subscribe_to_events` relationship, this plugin needs
+to obtain data from  and provide data to the source node. Certain conventions are therefore needed for
+passing data between this plugin and the plugins responsible for the source
+nodes in these relationships.  In Cloudify, the mechanism for
+sharing data among plugins is the `ctx.instance.runtime_properties` dictionary
+associated with each node.
+
+A given source node may have relationships with several topics.  For example, an ONAP DCAE
+analytics module might subscribe to one topic to get input for its processing and
+publish its results to a different topic.   When this DMaaP plugin and the plugin for the
+source node exchange information, they need to do in a way that lets them distinguish
+among different feeds.   We do this through a simple convention:  for each source node
+to topic relationship, the source node plugin will create a property in the source node's
+`runtime_properties` dictionary.  The name of the property will be the same as the
+name of the target node of the relationship.  For instance, if a node has a
+`publishes_events` relationship with a target node named `topic00`, then the plugin that's
+responsible for managing the source node with create an entry in the source node's
+`runtime_properties` dictionary named `topic00`.  This entry itself will be a dictionary.
+
+For both types of relationship, the data exchange dictionary has the following
+properties:
+
+Property|Set by|Description
+--------|------|------------------------------------------------
+location|source node plugin|the DMaaP location for the publisher or subscriber, used to set up routing
+client_role|source node plugin|the AAF client role that's requesting publish or subscribe access to the topic
+topic_url|DMaaP plugin|the URL for accessing the topic to publish or receive events
+
+### Interaction with Consul configuration store
+In addition to storing the results of DMaaP Data Router and DMaaP Message Router provisioning operations in `runtime_properties`,
+the DMaaP plugin also stores these results into the ONAP configuration store, which resides in a
+[Consul key-value store](https://www.consul.io/).  This allows DMaaP clients (components that act as publishers, subscribers, or both)
+to retrieve their DMaaP configuration information from Consul, rather than having the plugin that deploys the client directly
+configure the client using data in `runtime_properties`.
+
+The `runtime_properties` for a client must contain a property called `service_component_name`.  If this property is not present,
+the plugin will raise a NonRecoverableError and cause the installation to fail.
+
+If `service_component_name` is present, then the plugin will use a Consul key consisting of the value
+of `service_component_name` prepended to the fixed string `:dmaap`.   For example, if the `service_component_name`
+is `client123`, the plugin will use `client123:dmaap` as the key for storing DMaaP information into Consul.
+Information for all of the feeds and topics for a client are stored under the same key.
+
+The value stored is a nested JSON object.  At the top level of the object are properties representing each topic or feed
+for which the component is a publisher or subscriber.  The name of the property is the node name of the target feed or topic.
+The value of the property is another JSON object that corresponds to the dictionary that the plugin created in
+`runtime_properties` corresponding to the target feed or topic.  Note that the information in Consul includes
+all of the properties for the feed or topic, those set by the source node plugin as well as those set by the DMaaP plugin.
+
+Examples:
+
+Data Router publisher, target feed `feed00`:
+```
+{
+  "feed00": {
+    "username": "rC9QR51I",
+    "log_url": "https://dmaap.example.com/feedlog/972",
+    "publish_url": "https://dmaap.example.com/publish/972",
+    "location": "loc00",
+    "password": "QOQeUh5KLR",
+    "publisher_id": "972.360gm"
+  }
+}
+```
+
+Data Router subscriber, target feed `feed01`:
+```
+{
+  "feed01": {
+    "username": "drdeliver",
+    "password": "1loveDataR0uter",
+    "location": "loc00",
+    "delivery_url": "https://example.com/whatever",
+    "subscriber_id": "1550"
+  }
+}
+```
+
+Message Router publisher to `topic00`, subscriber to `topic01`.  Note how each topic
+appears as a top-level property in the object.
+```
+{
+  "topic00": {
+    "topic_url": "https://dmaap.example.com:3905/events/org.onap.ccsdk.dmaap.FTL2.outboundx",
+    "client_role": "org.onap.ccsdk.member",
+    "location": "loc00",
+    "client_id": "1494621774522"
+  },
+  "topic01": {
+    "topic_url": "https://dmaap.example.com:3905/events/org.onap.ccsdk.dmaap.FTL2.inboundx",
+    "client_role": "org.onap.ccsdk.member",
+    "location": "loc00",
+    "client_id": "1494621778627"
+  }
+}
+```
+
+### Packaging and installing
+The DMaaP plugin is meant to be used as a [Cloudify managed plugin](http://docs.getcloudify.org/3.4.0/plugins/using-plugins/). Managed plugins
+are packaged using [`wagon`](https://github.com/cloudify-cosmo/wagon).
+
+To package this plugin, executing the following command in the top-level directory of this plugin, from a Python environment in which `wagon` has been installed:
+```
+wagon create -s . -r -o /path/to/directory/for/wagon/output
+```
+Once the wagon file is built, it can be uploaded to a Cloudify Manager host using the `cfy plugins upload` command described in the documentation above.
+
+Managed plugins can also be loaded at the time a Cloudify Manager host is installed, via the installation blueprint and inputs file.  We expect that this plugin will
+be loaded at Cloudify Manager installation time, and that `cfy plugins upload` will be used only for delivering patches between releases.
+
+### Configuration
+The plugin needs to be configured with certain parameters needed to access the DMaaP Bus Controller.  In keeping with the ONAP architecture, this information is
+stored in Consul.
+
+The plugin finds the address and port of the DMaaP Bus Controller using the Consul service discovery facility.  The plugin expects the Bus Controller to be
+registered under the name `dmaap_bus_controller`.
+
+Additional parameters come from the `dmaap` key in the Cloudify Manager's Consul configuration, which is stored in the Consul KV store under the key name
+'cloudify_manager'.  The table below lists the properties in the configuration:
+
+Property|Type|Required?|Default|Description
+--------|----|---------|-------|--------------------------------
+`username`|string|Yes|(none)|The username for logging into DMaaP Bus Controller
+`password`|string|Yes|(none)|The password for logging into DMaaP Bus Controller
+`owner`|string|Yes|(none)|The name to be used as the owner for entities created by the plugin
+`protocol`|string|No|`https`|The protocol (URL scheme) used to access the DMaaP bus controller (`http` or `https`)
+`path`|string|No|`webapi`|The path to the root of the DMaaP Bus Controller API endpoint
+
+Here is an example of a Cloudify Manager configuration object showing only the `dmaap` key:
+```
+{
+  "dmaap": {
+    "username": "dmaap.client@ccsdkorch.onap.org",
+    "password": "guessmeifyoucan"
+    "owner": "ccsdkorc"
+  },
+
+  (other configuration here)
+
+}
+```
diff --git a/dmaap/consulif/__init__.py b/dmaap/consulif/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/dmaap/consulif/consulif.py b/dmaap/consulif/consulif.py
new file mode 100644 (file)
index 0000000..e742895
--- /dev/null
@@ -0,0 +1,120 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import consul
+import json
+from urlparse import urlparse
+
+class ConsulHandle(object):
+    '''
+    Provide access to Consul KV store and service discovery
+    '''
+
+    def __init__(self, api_url, user, password, logger):
+        '''
+        Constructor
+        '''
+        u = urlparse(api_url)
+        self.ch = consul.Consul(host=u.hostname, port=u.port, scheme=u.scheme)
+
+    def get_config(self, key):
+        '''
+        Get configuration information from Consul using the provided key.
+        It should be in JSON form.  Convert it to a dictionary
+        '''
+        (index, val) = self.ch.kv.get(key)
+        config = json.loads(val['Value'])        # will raise ValueError if not JSON, let it propagate
+        return config
+
+    def get_service(self,service_name):
+        '''
+        Look up the service named service_name in Consul.
+        Return the service address and port.
+        '''
+        (index, val) = self.ch.catalog.service(service_name)
+        if len(val) > 0:                # catalog.service returns an empty array if service not found
+            service = val[0]            # Could be multiple listings, but we take the first
+            if ('ServiceAddress' in service) and (len(service['ServiceAddress']) > 0):
+                service_address = service['ServiceAddress']    # Most services should have this
+            else:
+                service_address = service['Address']         # "External" services will have this only
+            service_port = service['ServicePort']
+        else:
+            raise Exception('Could not find service information for "{0}"'.format(service_name))
+
+        return service_address, service_port
+
+    def add_to_entry(self, key, add_name, add_value):
+        '''
+        Find 'key' in consul.
+        Treat its value as a JSON string representing a dict.
+        Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
+        Turn the resulting extended dict into a JSON string.
+        Store the string back into Consul under 'key'.
+        Watch out for conflicting concurrent updates.
+
+        Example:
+        Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}'
+        add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'})
+        should result in the value for key 'xyz:dmaap' in consul being updated to
+        '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}'
+        '''
+
+        while True:     # do until update succeeds
+            (index, val) = self.ch.kv.get(key)     # index gives version of key retrieved
+
+            if val is None:     # no key yet
+                vstring = '{}'
+                mod_index = 0   # Use 0 as the cas index for initial insertion of the key
+            else:
+                vstring = val['Value']
+                mod_index = val['ModifyIndex']
+
+            # Build the updated dict
+            # Exceptions just propagate
+            v = json.loads(vstring)
+            v[add_name] = add_value
+            new_vstring = json.dumps(v)
+
+            updated = self.ch.kv.put(key, new_vstring, cas=mod_index)       # if the key has changed since retrieval, this will return false
+            if updated:
+                break
+
+
+    def delete_entry(self,entry_name):
+        '''
+        Delete an entire key-value entry whose key is 'entry_name' from the Consul KV store.
+
+        Note that the kv.delete() operation always returns True,
+        whether there's an entry with key 'entry_name' exists or not.  This doesn't seem like
+        a great design, but it means it's safe to try to delete the same entry repeatedly.
+
+        Note also in our application for this plugin, the uninstall workflow will always delete all of the topics and
+        feeds we've stored into the 'component_name:dmaap' entry.
+
+        Given the two foregoing notes, it is safe for this plugin to attempt to delete the entire
+        'component_name:dmaap' entry any time it performs an 'unlink' operation for a publishes or
+        subscribes relationship.   The first unlink will actually remove the entry, the subsequent ones
+        will harmlessly try to remove it again.
+
+        The 'correct' approach would be to have a delete_from_entry(self, key, delete_name) that fetches
+        the entry from Consul, removes only the topic or feed being unlinked, and then puts the resulting
+        entry back into Consul.  It would be very similar to add_from_entry.  When there's nothing left
+        in the entry, then the entire entry would be deleted.
+        '''
+        self.ch.kv.delete(entry_name)
diff --git a/dmaap/dmaap.yaml b/dmaap/dmaap.yaml
new file mode 100644 (file)
index 0000000..1c3ff43
--- /dev/null
@@ -0,0 +1,193 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+
+# Types and relationships for DMaaP data router feeds
+
+tosca_definitions_version: cloudify_dsl_1_3
+
+imports:
+  - http://www.getcloudify.org/spec/cloudify/3.4/types.yaml
+
+plugins:
+  dmaapplugin:
+    executor: 'central_deployment_agent'
+    package_name: cloudifydmaapplugin
+    package_version: 1.2.0
+
+
+node_types:
+
+  # Data Router feed to be created
+  ccsdk.nodes.Feed:
+    derived_from: cloudify.nodes.Root
+
+    properties:
+      feed_name:
+        type: string
+        required: false
+      feed_version:
+        type: string
+        required: false
+      feed_description:
+        type: string
+        required: false
+      aspr_classification:
+        type: string
+        required: false
+
+    interfaces:
+      cloudify.interfaces.lifecycle:
+        create:
+          implementation:
+            dmaapplugin.dmaapplugin.dr_lifecycle.create_feed
+        delete:
+          implementation:
+            dmaapplugin.dmaapplugin.dr_lifecycle.delete_feed
+
+  # Existing Data Router feed to be used as target for publishing/subscribing
+  ccsdk.nodes.ExistingFeed:
+    derived_from: cloudify.nodes.Root
+
+    properties:
+      feed_id:
+        type: string
+        required: true
+
+    interfaces:
+      cloudify.interfaces.lifecycle:
+        configure:
+          implementation:
+            dmaapplugin.dmaapplugin.dr_lifecycle.get_existing_feed
+
+  # Existing Global Data Router feed (created via Invenio) to be used as target for bridging
+  ccsdk.nodes.ExternalTargetFeed:
+    derived_from: cloudify.nodes.Root
+
+    properties:
+      url:
+        type: string
+        required: true
+      username:
+        type: string
+        required: true
+      userpw:
+        type: string
+        required: true
+
+  # Global Data Router feed to be used as a source for bridging
+  # Has no properties
+  ccsdk.nodes.ExternalSourceFeed:
+    derived_from: cloudify.nodes.Root
+
+  # Message Router topic to be created
+  ccsdk.nodes.Topic:
+    derived_from: cloudify.nodes.Root
+
+    properties:
+      topic_name:
+        type: string
+        required: false
+      topic_description:
+        type: string
+        required: false
+      txenable:
+        type: boolean
+        required: false
+      replication_case:
+        type: string
+        required: false
+      global_mr_url:
+        type: string
+        required: false
+
+    interfaces:
+      cloudify.interfaces.lifecycle:
+        create:
+          implementation:
+            dmaapplugin.dmaapplugin.mr_lifecycle.create_topic
+        delete:
+          implementation:
+            dmaapplugin.dmaapplugin.mr_lifecycle.delete_topic
+
+  # Existing Message Router topic to be used as target for publishing/subscribing
+  ccsdk.nodes.ExistingTopic:
+    derived_from: cloudify.nodes.Root
+
+    properties:
+      fqtn:
+        type: string
+        required: true
+
+    interfaces:
+      cloudify.interfaces.lifecycle:
+        configure:
+          implementation:
+            dmaapplugin.dmaapplugin.mr_lifecycle.get_existing_topic
+
+relationships:
+
+  ccsdk.relationships.publish_files:
+    derived_from: cloudify.relationships.connected_to
+    target_interfaces:
+      cloudify.interfaces.relationship_lifecycle:
+        preconfigure: dmaapplugin.dmaapplugin.dr_relationships.add_dr_publisher
+        unlink: dmaapplugin.dmaapplugin.dr_relationships.delete_dr_publisher
+
+  ccsdk.relationships.subscribe_to_files:
+    derived_from: cloudify.relationships.connected_to
+    target_interfaces:
+      cloudify.interfaces.relationship_lifecycle:
+        establish: dmaapplugin.dmaapplugin.dr_relationships.add_dr_subscriber
+        unlink: dmaapplugin.dmaapplugin.dr_relationships.delete_dr_subscriber
+
+  ccsdk.relationships.publish_events:
+    derived_from: cloudify.relationships.connected_to
+    target_interfaces:
+      cloudify.interfaces.relationship_lifecycle:
+        preconfigure: dmaapplugin.dmaapplugin.mr_relationships.add_mr_publisher
+        unlink: dmaapplugin.dmaapplugin.mr_relationships.delete_mr_client
+
+  ccsdk.relationships.subscribe_to_events:
+    derived_from: cloudify.relationships.connected_to
+    target_interfaces:
+      cloudify.interfaces.relationship_lifecycle:
+        preconfigure: dmaapplugin.dmaapplugin.mr_relationships.add_mr_subscriber
+        unlink: dmaapplugin.dmaapplugin.mr_relationships.delete_mr_client
+
+  ccsdk.relationships.bridges_to:
+    derived_from: cloudify.relationships.connected_to
+    target_interfaces:
+      cloudify.interfaces.relationship_lifecycle:
+        preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_dr_bridge
+        unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge
+
+  ccsdk.relationships.bridges_to_external:
+    derived_from: cloudify.relationships.connected_to
+    target_interfaces:
+      cloudify.interfaces.relationship_lifecycle:
+        preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_external_dr_bridge
+        unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge
+
+  ccsdk.relationships.bridges_from_external_to_internal:
+    derived_from: cloudify.relationships.connected_to
+    target_interfaces:
+      cloudify.interfaces.relationship_lifecycle:
+        preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_external_source_dr_bridge
+        unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge
+
diff --git a/dmaap/dmaapcontrollerif/__init__.py b/dmaap/dmaapcontrollerif/__init__.py
new file mode 100644 (file)
index 0000000..611169f
--- /dev/null
@@ -0,0 +1 @@
+# DMaaP Bus Controller interface library
\ No newline at end of file
diff --git a/dmaap/dmaapcontrollerif/dmaap_requests.py b/dmaap/dmaapcontrollerif/dmaap_requests.py
new file mode 100644 (file)
index 0000000..eb6fe1b
--- /dev/null
@@ -0,0 +1,256 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import requests
+
+### "Constants"
+FEEDS_PATH = '/feeds'
+PUBS_PATH = '/dr_pubs'
+SUBS_PATH = '/dr_subs'
+TOPICS_PATH = '/topics'
+CLIENTS_PATH = '/mr_clients'
+LOCATIONS_PATH = '/dcaeLocations'
+
+class DMaaPControllerHandle(object):
+    '''
+    A simple wrapper class to map DMaaP bus controller API calls into operations supported by the requests module
+    '''
+
+    def __init__(self, api_url, user, password, logger,
+                 feeds_path = FEEDS_PATH,
+                 pubs_path = PUBS_PATH,
+                 subs_path = SUBS_PATH,
+                 topics_path = TOPICS_PATH,
+                 clients_path = CLIENTS_PATH):
+        '''
+        Constructor
+        '''
+        self.api_url = api_url        # URL for the root of the Controller resource tree, no trailing "/"
+        self.auth = (user, password)  # user name and password for HTTP basic auth
+        self.logger = logger
+        self.feeds_path = feeds_path
+        self.pubs_path = pubs_path
+        self.subs_path = subs_path
+        self.topics_path = topics_path
+        self.clients_path = clients_path
+
+
+    ### INTERNAL FUNCTIONS ###
+
+    def _make_url(self, path):
+        '''
+        Make a full URL given the path relative to the root
+        '''
+        if not path.startswith('/'):
+            path = '/' + path
+
+        return self.api_url + path
+
+    def _get_resource(self, path):
+        '''
+        Get the DMaaP resource at path, where path is relative to the root.
+        '''
+        url = self._make_url(path)
+        self.logger.info("Querying URL: {0}".format(url))
+        return requests.get(url, auth=self.auth)
+
+    def _create_resource(self, path, resource_content):
+        '''
+        Create a DMaaP resource by POSTing to the resource collection
+        identified by path (relative to root), using resource_content as the body of the post
+        '''
+        url = self._make_url(path)
+        self.logger.info("Posting to URL: {0} with body: {1}".format(url, resource_content))
+        return requests.post(url, auth=self.auth, json=resource_content)
+
+    def _delete_resource(self, path):
+        '''
+        Delete the DMaaP resource at path, where path is relative to the root.
+        '''
+        url = self._make_url(path)
+        self.logger.info("Deleting URL: {0}".format(url))
+        return requests.delete(url, auth=self.auth)
+
+    ### PUBLIC API ###
+
+    # Data Router Feeds
+    def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None):
+        '''
+        Create a DMaaP data router feed with the given feed name
+        and (optionally) feed version, feed description, ASPR classification,
+        and owner
+        '''
+        feed_definition = {'feedName' : name}
+        if version:
+            feed_definition['feedVersion'] = version
+        if description:
+            feed_definition['feedDescription'] = description
+        if aspr_class:
+            feed_definition['asprClassification'] = aspr_class
+        if owner:
+            feed_definition['owner'] = owner
+
+        return self._create_resource(self.feeds_path, feed_definition)
+
+    def get_feed_info(self, feed_id):
+        '''
+        Get the representation of the DMaaP data router feed whose feed id is feed_id.
+        '''
+        return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id))
+
+    def delete_feed(self, feed_id):
+        '''
+        Delete the DMaaP data router feed whose feed id is feed_id.
+        '''
+        return self._delete_resource("{0}/{1}".format(self.feeds_path, feed_id))
+
+    # Data Router Publishers
+    def add_publisher(self, feed_id, location, username, password, status=None):
+        '''
+        Add a publisher to feed feed_id at location location with user, pass, and status
+        '''
+        publisher_definition = {
+            'feedId' : feed_id,
+            'dcaeLocationName' : location,
+            'username' : username,
+            'userpwd' : password
+        }
+
+        if status:
+            publisher_definition['status'] = status
+
+        return self._create_resource(self.pubs_path, publisher_definition)
+
+    def get_publisher_info(self, pub_id):
+        '''
+        Get the representation of the DMaaP data router publisher whose publisher id is pub_id
+        '''
+        return self._get_resource("{0}/{1}".format(self.pubs_path, pub_id))
+
+    def delete_publisher(self, pub_id):
+        '''
+        Delete the DMaaP data router publisher whose publisher id is id.
+        '''
+        return self._delete_resource("{0}/{1}".format(self.pubs_path, pub_id))
+
+
+    # Data Router SUbscrihers
+    def add_subscriber(self, feed_id, location, delivery_url, username, password, status=None):
+        '''
+        Add a publisher to feed feed_id at location location with user, pass, and status
+        '''
+        subscriber_definition = {
+            'feedId' : feed_id,
+            'dcaeLocationName' : location,
+            'deliveryURL' : delivery_url,
+            'username' : username,
+            'userpwd' : password
+        }
+
+        if status:
+            subscriber_definition['status'] = status
+
+        return self._create_resource(self.subs_path, subscriber_definition)
+
+    def get_subscriber_info(self, sub_id):
+        '''
+        Get the representation of the DMaaP data router subscriber whose subscriber id is sub_id
+        '''
+        return self._get_resource("{0}/{1}".format(self.subs_path, sub_id))
+
+    def delete_subscriber(self, sub_id):
+        '''
+        Delete the DMaaP data router subscriber whose subscriber id is sub_id.
+        '''
+        return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id))
+
+    # Message router topics
+    def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None):
+        '''
+        Create a message router topic with the topic name 'name' and optionally the topic_description
+        'description', the 'txenable' flag and the topic owner 'owner'.
+        '''
+        topic_definition = {'topicName' : name};
+        if description:
+            topic_definition['topicDescription'] = description
+        if owner:
+            topic_definition['owner'] = owner
+        if txenable != None:                            # It's a boolean!
+            topic_definition['txenable'] = txenable
+        if replication_case:
+            topic_definition['replicationCase'] = replication_case
+        if global_mr_url:
+            topic_definition['globalMrURL'] = global_mr_url
+
+        return self._create_resource(self.topics_path, topic_definition)
+
+    def get_topic_info(self, fqtn):
+        '''
+        Get information about the topic whose fully-qualified name is 'fqtn'
+        '''
+        return self._get_resource("{0}/{1}".format(self.topics_path, fqtn))
+
+    def delete_topic(self, fqtn):
+        '''
+        Delete the topic whose fully qualified name is 'fqtn'
+        '''
+        return self._delete_resource("{0}/{1}".format(self.topics_path, fqtn))
+
+    # Message route clients (publishers and subscribers
+    def create_client(self, fqtn, location, client_role, actions):
+        '''
+        Creates a client authorized to access the topic with fully-qualified name 'fqtn',
+        from the location 'location', using the AAF client role 'client_role'.  The
+        client is authorized to perform actions in the list 'actions'.  (Valid
+        values are 'pub', 'sub', and 'view'
+        '''
+        client_definition = {
+            'fqtn' : fqtn,
+            'dcaeLocationName' : location,
+            'clientRole' : client_role,
+            'action' : actions
+        }
+        return self._create_resource(self.clients_path, client_definition)
+
+    def get_client_info(self, client_id):
+        '''
+        Get client information for the client whose client ID is 'client_id'
+        '''
+        return self._get_resource("{0}/{1}".format(self.clients_path, client_id))
+
+    def delete_client(self, client_id):
+        '''
+        Delete the client whose client ID is 'client_id'
+        '''
+        return self._delete_resource("{0}/{1}".format(self.clients_path, client_id))
+
+    def get_dcae_locations(self, dcae_layer):
+        '''
+        Get the list of location names known to the DMaaP bus controller
+        whose "dcaeLayer" property matches dcae_layer and whose status is "VALID".
+        "dcaeLayer" is "opendcae-central" for central sites.
+        '''
+        # Do these as a separate step so things like 404 get reported precisely
+        locations = self._get_resource(LOCATIONS_PATH)
+        locations.raise_for_status()
+
+        # pull out location names for VALID locations with matching dcae_layer
+        return  map(lambda l: l["dcaeLocationName"],
+                    filter(lambda i : (i['dcaeLayer'] == dcae_layer and i['status'] == 'VALID'),
+                       locations.json()))
+
diff --git a/dmaap/dmaapplugin/__init__.py b/dmaap/dmaapplugin/__init__.py
new file mode 100644 (file)
index 0000000..130c0bf
--- /dev/null
@@ -0,0 +1,46 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+## Get parameters for accessing the DMaaP controller
+from consulif.consulif import ConsulHandle
+from cloudify.exceptions import NonRecoverableError
+
+CONSUL_HOST = "127.0.0.1"                   # Should always be a local consul agent on Cloudify Manager
+CM_SERVICE_NAME = "cloudify_manager"        # Name under which CM is registered, used as key to get config
+DBC_SERVICE_NAME= "dmaap_bus_controller"    # Name under which the DMaaP bus controller is registered
+
+try:
+    _ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, None)
+    config = _ch.get_config(CM_SERVICE_NAME)
+    DMAAP_USER = config['dmaap']['username']
+    DMAAP_PASS = config['dmaap']['password']
+    DMAAP_OWNER = config['dmaap']['owner']
+    if 'protocol' in config['dmaap']:
+        DMAAP_PROTOCOL = config['dmaap']['protocol']
+    else:
+        DMAAP_PROTOCOL = 'https'    # Default to https (service discovery should give us this but doesn't
+    if 'path' in config['dmaap']:
+        DMAAP_PATH = config['dmaap']['path']
+    else:
+        DMAAP_PATH = 'webapi'       # SHould come from service discovery but Consul doesn't support it
+
+    service_address, service_port = _ch.get_service(DBC_SERVICE_NAME)
+    DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH)
+
+except Exception as e:
+        raise NonRecoverableError("Error configuring dmaap plugin: {0}".format(e))
diff --git a/dmaap/dmaapplugin/dmaaputils.py b/dmaap/dmaapplugin/dmaaputils.py
new file mode 100644 (file)
index 0000000..e043a07
--- /dev/null
@@ -0,0 +1,28 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+# Utility functions
+
+import string
+import random
+
+def random_string(n):
+    '''
+    Create a random alphanumeric string, n characters long.
+    '''
+    return ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for x in range(n))
diff --git a/dmaap/dmaapplugin/dr_bridge.py b/dmaap/dmaapplugin/dr_bridge.py
new file mode 100644 (file)
index 0000000..4e0df4d
--- /dev/null
@@ -0,0 +1,198 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS
+from dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Set up a subscriber to a source feed
+def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw):
+    # Add subscriber to source feed
+    add_sub = dmc.add_subscriber(source_feed_id, loc, delivery_url, username, userpw)
+    add_sub.raise_for_status()
+    return add_sub.json()
+
+# Set up a publisher to a target feed
+def _set_up_publisher(dmc, target_feed_id, loc):
+    username = random_string(8)
+    userpw = random_string(10)
+    add_pub = dmc.add_publisher(target_feed_id, loc, username, userpw)
+    add_pub.raise_for_status()
+    pub_info = add_pub.json()
+    return pub_info["pubId"], username, userpw
+
+# Get a central location to use when creating a publisher or subscriber
+def _get_central_location(dmc):
+    locations = dmc.get_dcae_locations('opendcae-central')
+    if len(locations) < 1:
+        raise Exception('No central location found for setting up DR bridging')
+    return locations[0]          # We take the first one.  Typically there will be two central locations
+
+
+# Set up a "bridge" between two feeds internal to DCAE
+# A source feed "bridges_to" a target feed, meaning that anything published to
+# the source feed will be delivered to subscribers to the target feed (as well as
+# to subscribers of the source feed).
+#
+# The bridge is established by first adding a publisher to the target feed.  The result of doing this
+# is a publish URL and a set of publication credentials.
+#The publish URL and publication credentials are used to set up a subscriber to the source feed.
+#I.e., we tell the source feed to deliver to an endpoint which is actually a publish
+# endpoint for the target feed.
+@operation
+def create_dr_bridge(**kwargs):
+
+    try:
+
+        # Get source and target feed ids
+        if 'feed_id' in ctx.target.instance.runtime_properties:
+            target_feed_id = ctx.target.instance.runtime_properties['feed_id']
+        else:
+            raise Exception('Target feed has no feed_id property')
+        if 'feed_id' in ctx.source.instance.runtime_properties:
+            source_feed_id = ctx.source.instance.runtime_properties['feed_id']
+        else:
+            raise Exception('Source feed has no feed_id property')
+
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+        # Get a location to use when creating a publisher or subscriber--a central location seems reasonable
+        loc = _get_central_location(dmc)
+
+        ctx.logger.info('Creating bridge from feed {0} to feed {1} using location {2}'.format(source_feed_id, target_feed_id, loc))
+
+        # Add publisher to target feed
+        publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc)
+        ctx.logger.info("Added publisher id {0} to  target feed {1} with user {2}".format(publisher_id, target_feed_id, username))
+
+        # Add subscriber to source feed
+        delivery_url = ctx.target.instance.runtime_properties['publish_url']
+        subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw)
+        subscriber_id = subscriber_info["subId"]
+        ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, delivery_url))
+
+        # Save the publisher and subscriber IDs on the source node, indexed by the target node id
+        ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "subscriber_id": subscriber_id}
+
+    except Exception as e:
+        ctx.logger.error("Error creating bridge: {0}".format(e))
+        raise NonRecoverableError(e)
+
+# Set up a bridge from an internal DCAE feed to a feed in an external Data Router system
+# The target feed needs to be provisioned in the external Data Router system.  A publisher
+# to that feed must also be set up in the external Data Router system.  The publish URL,
+# username, and password need to be captured in a target node of type dcae.nodes.ExternalTargetFeed.
+# The bridge is established by setting up a subscriber to the internal DCAE source feed using the
+# external feed publisher parameters as delivery parameters for the subscriber.
+@operation
+def create_external_dr_bridge(**kwargs):
+    try:
+
+        # Make sure target feed has full set of properties
+        if 'url' in ctx.target.node.properties and  'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties:
+            url = ctx.target.node.properties['url']
+            username = ctx.target.node.properties['username']
+            userpw = ctx.target.node.properties['userpw']
+        else:
+            raise Exception ("Target feed missing url, username, and/or user pw")
+
+        # Make sure source feed has a feed ID
+        if 'feed_id' in ctx.source.instance.runtime_properties:
+            source_feed_id = ctx.source.instance.runtime_properties['feed_id']
+        else:
+            raise Exception('Source feed has no feed_id property')
+
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+        # Get a central location to use when creating subscriber
+        loc = _get_central_location(dmc)
+
+        ctx.logger.info('Creating external bridge from feed {0} to external url {1} using location {2}'.format(source_feed_id, url, loc))
+
+        # Create subscription to source feed using properties of the external target feed
+        subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, url, username, userpw)
+        subscriber_id = subscriber_info["subId"]
+        ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, url))
+
+        # Save the subscriber ID on the source node, indexed by the target node id
+        ctx.source.instance.runtime_properties[ctx.target.node.id] = {"subscriber_id": subscriber_id}
+
+    except Exception as e:
+        ctx.logger.error("Error creating external bridge: {0}".format(e))
+        raise NonRecoverableError(e)
+
+# Set up a bridge from a feed in an external Data Router system to an internal DCAE feed.
+# The bridge is established by creating a publisher on the internal DCAE feed.  Then a subscription
+# to the external feed is created through manual provisioning in the external Data Router system, using
+# the publish URL and the publisher username and password for the internal feed as the delivery parameters
+# for the external subscription.
+# In order to obtain the publish URL, publisher username, and password, a blueprint using this sort of
+# bridge will typically have an output that exposes the runtime_property set on the source node in this operation.
+@operation
+def create_external_source_dr_bridge(**kwargs):
+    try:
+        # Get target feed id
+        if 'feed_id' in ctx.target.instance.runtime_properties:
+            target_feed_id = ctx.target.instance.runtime_properties['feed_id']
+        else:
+            raise Exception('Target feed has no feed_id property')
+
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+        # Get a central location to use when creating a publisher
+        loc = _get_central_location(dmc)
+
+        # Create a publisher on the target feed
+        publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc)
+
+        # Save the publisher info on the source node, indexed by the target node
+        ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "url": ctx.target.instance.runtime_properties["publish_url"], "username": username, "userpw": userpw}
+
+    except Exception as e:
+        ctx.logger.error("Error creating external source bridge: {0}".format(e))
+
+# Remove the bridge between the relationship source and target.
+# For a bridge between 2 internal feeds, deletes the subscriber on the source feed and the publisher on the target feed.
+# For a bridge to an external target feed, deletes the subscriber on the source feed.
+# For a bridge from an external source feed, deletes the publisher on the target feed.
+@operation
+def remove_dr_bridge(**kwargs):
+    try:
+
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+        if ctx.target.node.id in ctx.source.instance.runtime_properties:
+
+            if 'subscriber_id' in ctx.source.instance.runtime_properties[ctx.target.node.id]:
+                # Delete the subscription for this bridge
+                ctx.logger.info("Removing bridge -- deleting subscriber {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id']))
+                dmc.delete_subscriber(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id'])
+
+            if 'publisher_id' in ctx.source.instance.runtime_properties:
+                # Delete the publisher for this bridge
+                ctx.logger.info("Removing bridge -- deleting publisher {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id']))
+                dmc.delete_publisher(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id'])
+
+        ctx.logger.info("Remove bridge from {0} to {1}".format(ctx.source.node.id, ctx.target.node.id))
+
+    except Exception as e:
+        ctx.logger.error("Error removing bridge: {0}".format(e))
+        # Let the uninstall workflow proceed--don't throw a NonRecoverableError
diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py
new file mode 100644 (file)
index 0000000..45f8674
--- /dev/null
@@ -0,0 +1,121 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER
+from dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Lifecycle operations for DMaaP Data Router feeds
+
+@operation
+def create_feed(**kwargs):
+    '''
+    Create a new data router feed
+        Expects "feed_name" to be set in node properties
+        Allows "feed_version", "feed_description", and "aspr_classification" as optional properties
+        (Sets default values if not provided )
+        Sets instance runtime properties:
+            - "feed_id"
+            - "publish_url"
+            - "log_url"
+
+    '''
+    try:
+        # Make sure there's a feed_name
+        if "feed_name" in ctx.node.properties.keys():
+            feed_name = ctx.node.properties["feed_name"]
+        else:
+            feed_name = random_string(12)
+
+        # Set defaults/placeholders for the optional properties for the feed
+        if "feed_version" in ctx.node.properties.keys():
+            feed_version = ctx.node.properties["feed_version"]
+        else:
+            feed_version = "0.0"
+        if "feed_description" in ctx.node.properties.keys():
+            feed_description = ctx.node.properties["feed_description"]
+        else:
+            feed_description = "No description provided"
+        if "aspr_classification" in ctx.node.properties.keys():
+            aspr_classification = ctx.node.properties["aspr_classification"]
+        else:
+            aspr_classification = "unclassified"
+
+        # Make the request to the controller
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        ctx.logger.info("Attempting to create feed name {0}".format(feed_name))
+        f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER)
+        f.raise_for_status()
+
+        # Capture important properties from the result
+        feed = f.json()
+        ctx.instance.runtime_properties["feed_id"] = feed["feedId"]
+        ctx.instance.runtime_properties["publish_url"] = feed["publishURL"]
+        ctx.instance.runtime_properties["log_url"] = feed["logURL"]
+        ctx.logger.info("Created feed name {0} with feed id {1}".format(feed_name, feed["feedId"]))
+
+    except Exception as e:
+        ctx.logger.error("Error creating feed: {er}".format(er=e))
+        raise NonRecoverableError(e)
+
+@operation
+def get_existing_feed(**kwargs):
+    '''
+    Find information for an existing data router feed
+        Expects "feed_id" to be set in node properties -- uniquely identifies the feed
+        Sets instance runtime properties:
+            - "feed_id"
+            - "publish_url"
+            - "log_url"
+    '''
+    try:
+        # Make the lookup request to the controller
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        f = dmc.get_feed_info(ctx.node.properties["feed_id"])
+        f.raise_for_status()
+
+        # Capture important properties from the result
+        feed = f.json()
+        ctx.instance.runtime_properties["feed_id"] = ctx.node.properties["feed_id"]   # Just to be consistent with newly-created node, above
+        ctx.instance.runtime_properties["publish_url"] = feed["publishURL"]
+        ctx.instance.runtime_properties["log_url"] = feed["logURL"]
+        ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"]))
+
+    except Exception as e:
+        ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
+        raise NonRecoverableError(e)
+
+@operation
+def delete_feed(**kwargs):
+    '''
+    Delete a feed
+        Expects "feed_id" to be set on the instance's runtime properties
+    '''
+    try:
+        # Make the lookup request to the controllerid=ctx.node.properties["feed_id"]
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        f = dmc.delete_feed(ctx.instance.runtime_properties["feed_id"])
+        f.raise_for_status()
+        ctx.logger.info("Deleting feed id {0}".format(ctx.instance.runtime_properties["feed_id"]))
+
+    except Exception as e:
+        ctx.logger.error("Error deleting feed id {id}: {er}".format(id=ctx.instance.runtime_properties["feed_id"],er=e))
+        # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/dr_relationships.py b/dmaap/dmaapplugin/dr_relationships.py
new file mode 100644 (file)
index 0000000..8796354
--- /dev/null
@@ -0,0 +1,211 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST
+from dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+from consulif.consulif import ConsulHandle
+
+# Lifecycle operations for DMaaP Data Router
+# publish and subscribe relationships
+
+@operation
+def add_dr_publisher(**kwargs):
+    '''
+    Sets up the source of the publishes_relationship as a publisher to the feed that
+    is the target of the relationship
+        Assumes target (the feed) has the following runtime properties set
+            - feed_id
+            - log_url
+            - publish_url
+        Assumes source (the publisher) has a runtime property whose name matches the node name of the feed.
+        This is a dictionary containing one property:
+            - location   (the dcaeLocationName to pass when adding the publisher to the feed)
+        Generates a user name and password that the publisher will need to use when publishing
+        Adds the following properties to the dictionary above:
+             - publish_url
+             - log_url
+             - username
+             - password
+    '''
+    try:
+        # Make sure we have a name under which to store DMaaP configuration
+        # Check early so we don't needlessly create DMaaP entities
+        if 'service_component_name' not in ctx.source.instance.runtime_properties:
+            raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+        target_feed = ctx.target.node.id
+        ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed))
+
+        # Set up the parameters for the add_publisher request to the DMaaP bus controller
+        feed_id = ctx.target.instance.runtime_properties["feed_id"]
+        location = ctx.source.instance.runtime_properties[target_feed]["location"]
+        username = random_string(8)
+        password = random_string(10)
+
+        # Make the request to add the publisher to the feed
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        add_pub = dmc.add_publisher(feed_id, location, username, password)
+        add_pub.raise_for_status()
+        publisher_info = add_pub.json()
+        publisher_id = publisher_info["pubId"]
+        ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password))
+
+        # Set runtime properties on the source
+        ctx.source.instance.runtime_properties[target_feed] = {
+           "publisher_id" : publisher_id,
+           "location" : location,
+           "publish_url" : ctx.target.instance.runtime_properties["publish_url"],
+           "log_url" : ctx.target.instance.runtime_properties["log_url"],
+           "username" : username,
+           "password" : password
+        }
+
+        # Set key in Consul
+        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+        ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, ctx.source.instance.runtime_properties[target_feed])
+
+    except Exception as e:
+        ctx.logger.error("Error adding publisher to feed: {er}".format(er=e))
+        raise NonRecoverableError(e)
+
+
+@operation
+def delete_dr_publisher(**kwargs):
+    '''
+    Deletes publisher (the source of the publishes_files relationship)
+    from the feed (the target of the relationship).
+    Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties,
+    when the publisher was added to the feed.
+    '''
+
+    try:
+        # Make sure we have a name under which to store DMaaP configuration
+        # Check early so we don't needlessly create DMaaP entities
+        if 'service_component_name' not in ctx.source.instance.runtime_properties:
+            raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+        # Get the publisher id
+        target_feed = ctx.target.node.id
+        publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"]
+        ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id, target_feed))
+
+        # Make the request
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        del_result = dmc.delete_publisher(publisher_id)
+        del_result.raise_for_status()
+
+        ctx.logger.info("Deleted publisher {0}".format(publisher_id))
+
+        # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+        # Will quietly do nothing if the entry has already been removed
+        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+        ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+    except Exception as e:
+        ctx.logger.error("Error deleting publisher: {er}".format(er=e))
+        # don't raise a NonRecoverable error here--let the uninstall workflow continue
+
+
+@operation
+def add_dr_subscriber(**kwargs):
+    '''
+    Sets up the source of the subscribes_to_files relationship as a subscriber to the
+    feed that is the target of the relationship.
+    Assumes target (the feed) has the following runtime property set
+        - feed_id
+    Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed.
+    This is a dictionary containing the following properties:
+        - location   (the dcaeLocationName to pass when adding the publisher to the feed)
+        - delivery_url (the URL to which data router will deliver files)
+        - username (the username data router will use when delivering files)
+        - password (the password data router will use when delivering files)
+    Adds a property to the dictionary above:
+        - subscriber_id  (used to delete the subscriber in the uninstall workflow
+    '''
+    try:
+        target_feed = ctx.target.node.id
+        ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed))
+
+        # Get the parameters for the call
+        feed_id = ctx.target.instance.runtime_properties["feed_id"]
+        location = ctx.source.instance.runtime_properties[target_feed]["location"]
+        delivery_url = ctx.source.instance.runtime_properties[target_feed]["delivery_url"]
+        username = ctx.source.instance.runtime_properties[target_feed]["username"]
+        password = ctx.source.instance.runtime_properties[target_feed]["password"]
+
+        # Make the request to add the subscriber to the feed
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password)
+        add_sub.raise_for_status()
+        subscriber_info = add_sub.json()
+        subscriber_id = subscriber_info["subId"]
+        ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location))
+
+        # Add subscriber_id to the runtime properties
+        # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id
+        ctx.source.instance.runtime_properties[target_feed] = {
+            "subscriber_id": subscriber_id,
+            "location" : location,
+            "delivery_url" : delivery_url,
+            "username" : username,
+            "password" : password
+        }
+        ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed]))
+
+        # Set key in Consul
+        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+        ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, ctx.source.instance.runtime_properties[target_feed])
+
+    except Exception as e:
+        ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e))
+        raise NonRecoverableError(e)
+
+
+@operation
+def delete_dr_subscriber(**kwargs):
+    '''
+    Deletes subscriber (the source of the subscribes_to_files relationship)
+    from the feed (the target of the relationship).
+    Assumes that the source node's runtime properties dictionary for the target feed
+    includes 'subscriber_id', set when the publisher was added to the feed.
+    '''
+    try:
+        # Get the subscriber id
+        target_feed = ctx.target.node.id
+        subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"]
+        ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed))
+
+        # Make the request
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        del_result = dmc.delete_subscriber(subscriber_id)
+        del_result.raise_for_status()
+
+        ctx.logger.info("Deleted subscriber {0}".format(subscriber_id))
+
+        # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+        # Will quietly do nothing if the entry has already been removed
+        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+        ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+    except Exception as e:
+        ctx.logger.error("Error deleting subscriber: {er}".format(er=e))
+        # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py
new file mode 100644 (file)
index 0000000..16ad953
--- /dev/null
@@ -0,0 +1,121 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER
+from dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Lifecycle operations for DMaaP Message Router topics
+@operation
+def create_topic(**kwargs):
+    '''
+    Creates a message router topic.
+    Allows 'topic_name', 'topic_description', 'txenable', 'replication_case',
+    and 'global_mr_url'  as optional node properties.  If 'topic_name' is not set,
+    generates a random one.
+    Sets 'fqtn' in the instance runtime_properties.
+    Note that 'txenable' is a Message Router flag indicating whether transactions
+    are enabled on the topic.
+    '''
+    try:
+        # Make sure there's a topic_name
+        if "topic_name" in ctx.node.properties:
+            topic_name = ctx.node.properties["topic_name"]
+        else:
+            topic_name = random_string(12)
+
+        # Make sure there's a topic description
+        if "topic_description" in ctx.node.properties:
+            topic_description = ctx.node.properties["topic_description"]
+        else:
+            topic_description = "No description provided"
+
+        # ..and the truly optional setting
+        if "txenable" in ctx.node.properties:
+            txenable = ctx.node.properties["txenable"]
+        else:
+            txenable= False
+
+        if "replication_case" in ctx.node.properties:
+            replication_case = ctx.node.properties["replication_case"]
+        else:
+            replication_case = None
+
+        if "global_mr_url" in ctx.node.properties:
+            global_mr_url = ctx.node.properties["global_mr_url"]
+        else:
+            global_mr_url = None
+
+
+        # Make the request to the controller
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
+        t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url)
+        t.raise_for_status()
+
+        # Capture important properties from the result
+        topic = t.json()
+        ctx.instance.runtime_properties["fqtn"] = topic["fqtn"]
+
+    except Exception as e:
+        ctx.logger.error("Error creating topic: {er}".format(er=e))
+        raise NonRecoverableError(e)
+
+@operation
+def get_existing_topic(**kwargs):
+    '''
+    Get data for an existing feed.
+    Expects 'fqtn' as a node property.
+    Copies this property to 'fqtn' in runtime properties for consistency
+    with a newly-created topic.
+    While there's no real need to make a call to the DMaaP bus controller,
+    we do so just to make sure the fqtn is known to the controller, so we
+    don't run into problems when we try to add a publisher or subscriber later.
+    '''
+    try:
+        fqtn = ctx.node.properties["fqtn"]
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        ctx.logger.info("Attempting to get info for existing topic {0}".format(fqtn))
+        t = dmc.get_topic_info(fqtn)
+        t.raise_for_status()
+
+        ctx.instance.runtime_properties["fqtn"] = ctx.node.properties["fqtn"]
+
+    except Exception as e:
+        ctx.logger.error("Error getting existing topic: {er}".format(er=e))
+        raise NonRecoverableError(e)
+
+@operation
+def delete_topic(**kwargs):
+    '''
+    Delete the topic.  Expects the instance runtime property "fqtn" to have been
+    set when the topic was created.
+    '''
+    try:
+        fqtn = ctx.instance.runtime_properties["fqtn"]
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        ctx.logger.info("Attempting to delete topic {0}".format(fqtn))
+        t = dmc.delete_topic(fqtn)
+        t.raise_for_status()
+
+    except Exception as e:
+        ctx.logger.error("Error getting existing topic: {er}".format(er=e))
+        # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/mr_relationships.py b/dmaap/dmaapplugin/mr_relationships.py
new file mode 100644 (file)
index 0000000..ff92d67
--- /dev/null
@@ -0,0 +1,119 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER, CONSUL_HOST
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+from consulif.consulif import ConsulHandle
+
+# Message router relationship operations
+
+def _add_mr_client(ctype, actions):
+    '''
+    Adds the node represented by 'source' as a client (publisher or subscriber) to
+    to topic represented by the 'target' node.  The list of actions in 'actions'
+    determines whether the client is a subscriber or a publisher.
+
+    Assumes target (the topic) has the following runtime property set
+        - fqtn
+    Assumes source (the client) has a runtime property whose name matches the node name of the feed.
+    This is a dictionary containing the following properties:
+        - location   (the dcaeLocationName to pass when adding the client to the topic)
+        - client_role (the AAF client role under which the client will access the topic)
+    Adds two properties to the dictionary above:
+        - topic_url (the URL that the client can use to access the topic)
+        - client_id  (used to delete the client in the uninstall workflow)
+    '''
+    try:
+        # Make sure we have a name under which to store DMaaP configuration
+        # Check early so we don't needlessly create DMaaP entities
+        if 'service_component_name' not in ctx.source.instance.runtime_properties:
+            raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+        target_topic = ctx.target.node.id           # Key for the source's dictionary with topic-related info
+        fqtn = ctx.target.instance.runtime_properties["fqtn"]
+        ctx.logger.info("Attempting to add {0} as {1} to topic {2}".format(ctx.source.node.id, ctype, fqtn))
+
+        # Get the parameters needed for adding the client
+        location = ctx.source.instance.runtime_properties[target_topic]["location"]
+        client_role = ctx.source.instance.runtime_properties[target_topic]["client_role"]
+
+        # Make the request to add the client to the topic
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        c = dmc.create_client(fqtn, location, client_role, actions)
+        c.raise_for_status()
+        client_info = c.json()
+        client_id = client_info["mrClientId"]
+        topic_url = client_info["topicURL"]
+
+        # Update source's runtime properties
+        #ctx.source.instance.runtime_properties[target_topic]["topic_url"] = topic_url
+        #ctx.source.instance.runtime_properties[target_topic]["client_id"] = client_id
+        ctx.source.instance.runtime_properties[target_topic] = {
+            "topic_url" : topic_url,
+            "client_id" : client_id,
+            "location" : location,
+            "client_role" : client_role
+        }
+
+        ctx.logger.info("Added {0} id {1} to feed {2} at {3}".format(ctype, client_id, fqtn, location))
+
+        # Set key in Consul
+        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+        ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_topic, ctx.source.instance.runtime_properties[target_topic])
+
+    except Exception as e:
+        ctx.logger.error("Error adding client to feed: {er}".format(er=e))
+        raise NonRecoverableError(e)
+
+@operation
+def add_mr_publisher(**kwargs):
+    _add_mr_client("publisher", ["view", "pub"])
+
+@operation
+def add_mr_subscriber(**kwargs):
+        _add_mr_client("subscriber", ["view", "sub"])
+
+@operation
+def delete_mr_client(**kwargs):
+    '''
+    Delete the client (publisher or subscriber).
+    Expect property 'client_id' to have been set in the instance's runtime_properties
+    when the client was created.
+    '''
+    try:
+        target_topic = ctx.target.node.id
+        client_id = ctx.source.instance.runtime_properties[target_topic]["client_id"]
+        ctx.logger.info("Attempting to delete client {0} ".format(client_id))
+        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+        c = dmc.delete_client(client_id)
+        c.raise_for_status()
+
+        ctx.logger.info("Deleted client {0}".format(client_id))
+
+        # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+        # Will quietly do nothing if the entry has already been removed
+        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+        ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+    except Exception as e:
+        ctx.logger.error("Error deleting MR client: {er}".format(er=e))
+        # don't raise a NonRecoverable error here--let the uninstall workflow continue
+
diff --git a/dmaap/requirements.txt b/dmaap/requirements.txt
new file mode 100644 (file)
index 0000000..ffdb97f
--- /dev/null
@@ -0,0 +1 @@
+python-consul==0.7.0
diff --git a/dmaap/setup.py b/dmaap/setup.py
new file mode 100644 (file)
index 0000000..0d23668
--- /dev/null
@@ -0,0 +1,16 @@
+from setuptools import setup, find_packages
+
+setup(
+    name = "cloudifydmaapplugin",
+    version = "1.2.0",
+    packages=find_packages(),
+    author = "AT&T",
+    description = ("Cloudify plugin for creating DMaaP feeds and topics, and setting up publishers and subscribers."),
+    license = "",
+    keywords = "",
+    url = "",
+    zip_safe=False,
+    install_requires = [
+        "python-consul==0.7.0"
+    ]
+)
diff --git a/dmaap/tests/test_plugin.py b/dmaap/tests/test_plugin.py
new file mode 100644 (file)
index 0000000..b9ebedc
--- /dev/null
@@ -0,0 +1,26 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import pytest
+import requests
+from cloudify.mocks import MockCloudifyContext
+from cloudify.state import current_ctx
+from cloudify.exceptions import NonRecoverableError
+
+def test_noop():
+    pass
diff --git a/dmaap/tox.ini b/dmaap/tox.ini
new file mode 100644 (file)
index 0000000..9498c82
--- /dev/null
@@ -0,0 +1,26 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+[tox]
+envlist = py27
+[testenv]
+deps=
+    pytest
+    cloudify==3.4
+    requests
+commands=pytest