Cluster distributed data store 87/99687/1
authorBrinda Santh <bs2796@att.com>
Tue, 17 Dec 2019 01:59:41 +0000 (20:59 -0500)
committerBrinda Santh <bs2796@att.com>
Tue, 17 Dec 2019 01:59:41 +0000 (20:59 -0500)
Add experimental cluster co-ordination service using Atomic framework.

Included distributed data store creation utilities.

Sample docker compose data cluster between cds controller and resource-resolution instances.

Issue-ID: CCSDK-2000
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I4de00e773a996e08fd1d260fc27ed18832433883

20 files changed:
ms/blueprintsprocessor/application/pom.xml
ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml [new file with mode: 0644]
ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
ms/blueprintsprocessor/application/src/main/docker/startService.sh
ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt [new file with mode: 0644]
ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-bootstrap.conf [new file with mode: 0644]
ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-multicast.conf [new file with mode: 0644]
ms/blueprintsprocessor/application/src/main/resources/logback.xml
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/pom.xml
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt [new file with mode: 0644]
ms/blueprintsprocessor/parent/pom.xml

index f4d784b..a566c84 100755 (executable)
             <version>0.7.0-SNAPSHOT</version>
         </dependency>
 
+        <!-- Libs -->
+        <dependency>
+            <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+            <artifactId>atomix-lib</artifactId>
+        </dependency>
+
         <!-- Functions -->
         <dependency>
             <groupId>org.onap.ccsdk.cds.blueprintsprocessor.functions</groupId>
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
new file mode 100644 (file)
index 0000000..f4b4b79
--- /dev/null
@@ -0,0 +1,85 @@
+version: '3.7'
+
+services:
+  db:
+    image: mariadb:latest
+    container_name: ccsdk-mariadb
+    networks:
+      - cds-network
+    ports:
+      - "3306:3306"
+    volumes:
+      - ~/vm_mysql:/var/lib/mysql
+    restart: always
+    environment:
+      MYSQL_ROOT_PASSWORD: sdnctl
+      MYSQL_DATABASE: sdnctl
+      MYSQL_USER: sdnctl
+      MYSQL_PASSWORD: sdnctl
+  cds-controller-1:
+    depends_on:
+      - db
+    image: onap/ccsdk-blueprintsprocessor:latest
+    container_name: cds-controller-1
+    hostname: cds-controller-1
+    networks:
+      - cds-network
+    ports:
+      - "8000:8080"
+      - "9111:9111"
+    restart: always
+    volumes:
+      - target: /opt/app/onap/blueprints/deploy
+        type: volume
+        source: blueprints-deploy
+      - target: /opt/app/onap/config
+        type: bind
+        source: ./config
+    environment:
+      # Same as hostname and container name
+      CLUSTER_ID: cds-cluster
+      CLUSTER_NODE_ID: cds-controller-1
+      CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
+      CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
+      #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+      APPLICATIONNAME: cds-controller
+      BUNDLEVERSION: 1.0.0
+      APP_CONFIG_HOME: /opt/app/onap/config
+      STICKYSELECTORKEY:
+      ENVCONTEXT: dev
+  resource-resolution-1:
+    depends_on:
+      - db
+    image: onap/ccsdk-blueprintsprocessor:latest
+    container_name: resource-resolution-1
+    hostname: resource-resolution-1
+    networks:
+      - cds-network
+    ports:
+      - "8001:8080"
+      - "9112:9111"
+    restart: always
+    volumes:
+      - target: /opt/app/onap/blueprints/deploy
+        type: volume
+        source: blueprints-deploy
+      - target: /opt/app/onap/config
+        type: bind
+        source: ./config
+    environment:
+      CLUSTER_ID: cds-cluster
+      CLUSTER_NODE_ID: resource-resolution-1
+      CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
+      CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
+      #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+      APPLICATIONNAME: resource-resolution
+      BUNDLEVERSION: 1.0.0
+      APP_CONFIG_HOME: /opt/app/onap/config
+      STICKYSELECTORKEY:
+      ENVCONTEXT: dev
+volumes:
+  blueprints-deploy:
+
+networks:
+  cds-network:
+    driver: bridge
index 0ff04bf..d877702 100755 (executable)
@@ -1,9 +1,11 @@
-version: '3.3'
+version: '3.7'
 
 services:
   db:
     image: mariadb:latest
     container_name: ccsdk-mariadb
+    networks:
+      - cds-network
     ports:
       - "3306:3306"
     volumes:
@@ -20,6 +22,8 @@ services:
     image: onap/ccsdk-blueprintsprocessor:latest
     container_name: cds-controller-default
     hostname: cds-controller-default
+    networks:
+      - cds-network
     ports:
       - "8000:8080"
       - "9111:9111"
@@ -37,6 +41,8 @@ services:
       - db
     image: onap/ccsdk-commandexecutor:latest
     container_name: bp-command-executor
+    networks:
+      - cds-network
     ports:
       - "50051:50051"
     restart: always
@@ -48,6 +54,8 @@ services:
     image: onap/ccsdk-py-executor
     container_name: py-executor-default
     hostname: py-executor-default
+    networks:
+      - cds-network
     ports:
       - "50052:50052"
     restart: always
@@ -65,3 +73,7 @@ services:
 
 volumes:
   blueprints-deploy:
+
+networks:
+  cds-network:
+    driver: bridge
index 7dcb5ff..f5967dc 100644 (file)
@@ -2,7 +2,7 @@
 
 nodeName=BlueprintsProcessor_1.0.0_$(cat /proc/self/cgroup | grep docker | sed s/\\//\\n/g | tail -1)
 
-echo "APP Config HOME : ${APP_CONFIG_HOME}"
+echo "${CLUSTER_ID}:${CLUSTER_NODE_ID} APP Config HOME : ${APP_CONFIG_HOME}"
 export APP_HOME=/opt/app/onap
 
 keytool -import -noprompt -trustcacerts -keystore $JAVA_HOME/jre/lib/security/cacerts -storepass changeit -alias ONAP -import -file $APP_CONFIG_HOME/ONAP_RootCA.cer
@@ -18,4 +18,10 @@ exec java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sour
 -Djava.security.egd=file:/dev/./urandom \
 -DAPPNAME=${APPLICATIONNAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \
 -Dspring.config.location=${APP_CONFIG_HOME}/ \
+-DCLUSTER_ID=${CLUSTER_ID} \
+-DCLUSTER_NODE_ID=${CLUSTER_NODE_ID} \
+-DCLUSTER_NODE_ADDRESS=${CLUSTER_NODE_ID} \
+-DCLUSTER_MEMBERS=${CLUSTER_MEMBERS} \
+-DCLUSTER_STORAGE_PATH=${CLUSTER_STORAGE_PATH} \
+-DCLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} \
 org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt
diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt
new file mode 100644 (file)
index 0000000..b78ebf6
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor
+
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.boot.context.event.ApplicationReadyEvent
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Component
+import java.time.Duration
+import javax.annotation.PreDestroy
+
+/**
+ * To Start the cluster, minimum 2 Instances/ Replicas od CDS needed.
+ * All instance such as Blueprintprocessor, ResourceResolution, MessagePrioritization should be in
+ * same cluster and should have same cluster name.
+ *
+ * Data can be shared only between the clusters, outside the cluster data can't be shared.
+ * If cds-controller-x instance wants to share data with resource-resolution-x instance, then it should be in the
+ * same cluster.(cds-cluster) and same network (cds-network)
+ *
+ * Assumptions:
+ * 1. Container, Pod and Host names are same.
+ * 2. Container names should end with sequence number.
+ *      Blueprintprocessor example be : cds-controller-1, cds-controller-2, cds-controller-3
+ *      ResourceResolution example be : resource-resolution-1, resource-resolution-2,  resource-resolution-3
+ * 3. Each contained, should have environment properties CLUSTER_ID, CLUSTER_NODE_ID, CLUSTER_NODE_ADDRESS,
+ * CLUSTER_MEMBERS, CLUSTER_STORAGE_PATH
+ *     Example values :
+ *      CLUSTER_ID: cds-cluster
+ *      CLUSTER_NODE_ID: cds-controller-2
+ *      CLUSTER_NODE_ADDRESS: cds-controller-2
+ *      CLUSTER_MEMBERS: cds-controller-1,cds-controller-2,cds-controller-3,resource-resolution-1,resource-resolution-2,resource-resolution-3
+ *      CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
+ *      CLUSTER_CONFIG_FILE:  /opt/app/onap/config/atomix/atomix-multicast.conf
+ * 4. Cluster will be enabled only all the above properties present in the environments.
+ * if CLUSTER_ID is present, then it will try to create cluster.
+ */
+@Component
+open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePrintClusterService) {
+
+    private val log = logger(BluePrintProcessorCluster::class)
+
+    @EventListener(ApplicationReadyEvent::class)
+    fun startAndJoinCluster() = runBlocking {
+        val clusterId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID)
+
+        if (!clusterId.isNullOrEmpty()) {
+
+            val nodeId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
+                ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ID}")
+
+            val nodeAddress = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS)
+                ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS}")
+
+            val clusterMembers = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS)
+                ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_MEMBERS}")
+
+            val clusterMemberList = clusterMembers.split(",").map { it.trim() }.toList()
+
+            val clusterStorage = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH)
+                ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH}")
+
+            val clusterConfigFile = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE)
+
+            val clusterInfo = ClusterInfo(
+                id = clusterId, nodeId = nodeId,
+                clusterMembers = clusterMemberList, nodeAddress = nodeAddress,
+                storagePath = clusterStorage,
+                configFile = clusterConfigFile
+            )
+            bluePrintClusterService.startCluster(clusterInfo)
+        } else {
+            log.info(
+                "Cluster is disabled, to enable cluster set the environment " +
+                    "properties[CLUSTER_ID,CLUSTER_NODE_ID, CLUSTER_NODE_ADDRESS, CLUSTER_MEMBERS,CLUSTER_CONFIG_FILE]"
+            )
+        }
+    }
+
+    @PreDestroy
+    fun shutDown() = runBlocking {
+        bluePrintClusterService.shutDown(Duration.ofSeconds(1))
+    }
+}
diff --git a/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-bootstrap.conf b/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-bootstrap.conf
new file mode 100644 (file)
index 0000000..0fc31e0
--- /dev/null
@@ -0,0 +1,35 @@
+cluster {
+  # Configure the cluster node information.
+  node {
+    id: ${CLUSTER_NODE_ID}
+    address: ${CLUSTER_NODE_ADDRESS}
+  }
+  # Configure the node discovery protocol.
+  discovery {
+    type: bootstrap
+    nodes.1 {
+      id: cds-controller-1
+      address: "cds-controller-1:5679"
+    }
+    nodes.2 {
+      id: resource-reolution-1
+      address: "resource-reolution-1:5679"
+    }
+  }
+}
+# Configure the system management group.
+managementGroup {
+  type: raft
+  name: system
+  partitions: 1
+  members: [${CLUSTER_MEMBERS}]
+  storage {
+    directory: ${CLUSTER_STORAGE_PATH}/data-${CLUSTER_NODE_ID}
+    level: DISK
+  }
+}
+# Configure a Raft partition group.
+partitionGroups.data {
+  type: primary-backup
+  partitions: 7
+}
diff --git a/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-multicast.conf b/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-multicast.conf
new file mode 100644 (file)
index 0000000..fd16187
--- /dev/null
@@ -0,0 +1,40 @@
+cluster {
+  # Configure the cluster node information.
+  node {
+    id: ${CLUSTER_NODE_ID}
+    address: ${CLUSTER_NODE_ADDRESS}
+  }
+  # Configure the node discovery protocol.
+  discovery {
+    type: multicast
+  }
+  multicast: {
+    enabled: true
+    port: 54321
+  }
+  # Configure the SWIM membership protocol.
+  protocol {
+    type: swim
+    broadcastUpdates: true
+    gossipInterval: 500ms
+    probeInterval: 2s
+    suspectProbes: 2
+  }
+}
+# Configure the system management group.
+managementGroup {
+  type: raft
+  name: system
+  partitions: 1
+  members: [${CLUSTER_MEMBERS}]
+  storage {
+    directory: ${CLUSTER_STORAGE_PATH}/data-${CLUSTER_NODE_ID}
+    level: DISK
+  }
+}
+
+# Configure a Raft partition group.
+partitionGroups.data {
+  type: primary-backup
+  partitions: 7
+}
index e1389a6..d58be8a 100644 (file)
@@ -39,6 +39,7 @@
     <logger name="org.springframework" level="info"/>
     <logger name="org.springframework.web" level="info"/>
     <logger name="org.hibernate" level="error"/>
+    <logger name="io.atomix" level="warn"/>
     <logger name="org.onap.ccsdk.cds" level="info"/>
 
     <root level="info">
index fcc921c..caf0631 100644 (file)
@@ -217,4 +217,12 @@ object BluePrintConstants {
     const val MODEL_TYPE_ARTIFACT_COMPONENT_JAR = "artifact-component-jar"
 
     val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean()
+
+    /** Cluster Properties */
+    const val PROPERTY_CLUSTER_ID = "CLUSTER_ID"
+    const val PROPERTY_CLUSTER_NODE_ID = "CLUSTER_NODE_ID"
+    const val PROPERTY_CLUSTER_NODE_ADDRESS = "CLUSTER_NODE_ADDRESS"
+    const val PROPERTY_CLUSTER_MEMBERS = "CLUSTER_MEMBERS"
+    const val PROPERTY_CLUSTER_STORAGE_PATH = "CLUSTER_STORAGE_PATH"
+    const val PROPERTY_CLUSTER_CONFIG_FILE = "CLUSTER_CONFIG_FILE"
 }
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
new file mode 100644 (file)
index 0000000..d3d6210
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.controllerblueprints.core.utils
+
+import java.net.InetAddress
+
+object ClusterUtils {
+
+    /** get the local host name  */
+    fun hostname(): String {
+        val ip = InetAddress.getLocalHost()
+        return ip.hostName
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml
new file mode 100644 (file)
index 0000000..7fa7b45
--- /dev/null
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright © 2018-2019 AT&T Intellectual Property.
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+        <artifactId>commons</artifactId>
+        <version>0.7.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>atomix-lib</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Blueprints Processor Atomix Lib</name>
+    <description>Blueprints Processor Atomix Lib</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.atomix</groupId>
+            <artifactId>atomix</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.atomix</groupId>
+            <artifactId>atomix-raft</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.atomix</groupId>
+            <artifactId>atomix-primary-backup</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.atomix</groupId>
+            <artifactId>atomix-gossip</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+            <artifactId>db-lib</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt
new file mode 100644 (file)
index 0000000..8ea1593
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import org.springframework.context.annotation.Configuration
+
+@Configuration
+open class BluePrintAtomixLibConfiguration
+
+/**
+ * Exposed Dependency Service by this Atomix Lib Module
+ */
+fun BluePrintDependencyService.clusterService(): BluePrintClusterService =
+    instance(AtomixBluePrintClusterService::class)
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
new file mode 100644 (file)
index 0000000..696d728
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+
+import io.atomix.core.map.DistributedMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+fun <T : Map<*, *>> T.toDistributedMap(): DistributedMap<*, *> {
+    return if (this != null && this is DistributedMap<*, *>) this
+    else throw BluePrintProcessorException("map is not of type DistributedMap")
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
new file mode 100644 (file)
index 0000000..27921be
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
+
+import io.atomix.cluster.ClusterMembershipEvent
+import io.atomix.core.Atomix
+import kotlinx.coroutines.delay
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.stereotype.Service
+import java.time.Duration
+import java.util.concurrent.CompletableFuture
+
+@Service
+open class AtomixBluePrintClusterService : BluePrintClusterService {
+
+    private val log = logger(AtomixBluePrintClusterService::class)
+
+    lateinit var atomix: Atomix
+
+    private var joined = false
+
+    override suspend fun startCluster(clusterInfo: ClusterInfo) {
+        log.info(
+            "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
+                "starting with members(${clusterInfo.clusterMembers})"
+        )
+
+        /** Create Atomix cluster either from config file or default multi-cast cluster*/
+        atomix = if (!clusterInfo.configFile.isNullOrEmpty()) {
+            AtomixLibUtils.configAtomix(clusterInfo.configFile!!)
+        } else {
+            AtomixLibUtils.defaultMulticastAtomix(clusterInfo)
+        }
+
+        /** Listen for the member chaneg events */
+        atomix.membershipService.addListener { membershipEvent ->
+            when (membershipEvent.type()) {
+                ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added")
+                ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed")
+                ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed")
+                else -> log.info("***** Member event unknown")
+            }
+        }
+        atomix.start().join()
+        log.info(
+            "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
+                "created successfully...."
+        )
+
+        /** Receive ping from network */
+        val pingHandler = { message: String ->
+            log.info("####### ping message received : $message")
+            CompletableFuture.completedFuture(message)
+        }
+        atomix.communicationService.subscribe("ping", pingHandler)
+
+        /** Ping the network */
+        atomix.communicationService.broadcast(
+            "ping",
+            "ping from node(${clusterInfo.nodeId})"
+        )
+        joined = true
+    }
+
+    override fun clusterJoined(): Boolean {
+        return joined
+    }
+
+    override suspend fun allMembers(): Set<ClusterMember> {
+        check(::atomix.isInitialized) { "failed to start and join cluster" }
+        check(atomix.isRunning) { "cluster is not running" }
+        return atomix.membershipService.members.map {
+            ClusterMember(
+                id = it.id().id(),
+                memberAddress = it.host()
+            )
+        }.toSet()
+    }
+
+    override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> {
+        check(::atomix.isInitialized) { "failed to start and join cluster" }
+        check(atomix.isRunning) { "cluster is not running" }
+
+        return atomix.membershipService.members.filter {
+            it.id().id().startsWith(memberPrefix, true)
+        }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) }
+            .toSet()
+    }
+
+    override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
+        return AtomixLibUtils.distributedMapStore<T>(atomix, name)
+    }
+
+    override suspend fun shutDown(duration: Duration) {
+        val shutDownMilli = duration.toMillis()
+        log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
+        delay(shutDownMilli)
+        atomix.stop()
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
new file mode 100644 (file)
index 0000000..6e726a1
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ArrayNode
+import com.fasterxml.jackson.databind.node.MissingNode
+import com.fasterxml.jackson.databind.node.NullNode
+import com.fasterxml.jackson.databind.node.ObjectNode
+import io.atomix.core.Atomix
+import io.atomix.core.map.DistributedMap
+import io.atomix.protocols.backup.MultiPrimaryProtocol
+import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
+import io.atomix.protocols.raft.partition.RaftPartitionGroup
+import io.atomix.utils.net.Address
+import org.jsoup.nodes.TextNode
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
+
+object AtomixLibUtils {
+
+    fun configAtomix(filePath: String): Atomix {
+        val configFile = normalizedFile(filePath)
+        return Atomix.builder(configFile.absolutePath).build()
+    }
+
+    fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix {
+
+        val nodeId = clusterInfo.nodeId
+
+        val raftPartitionGroup = RaftPartitionGroup.builder("system")
+            .withNumPartitions(7)
+            .withMembers(clusterInfo.clusterMembers)
+            .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
+            .build()
+
+        val primaryBackupGroup =
+            PrimaryBackupPartitionGroup.builder("data")
+                .withNumPartitions(31)
+                .build()
+
+        return Atomix.builder()
+            .withMemberId(nodeId)
+            .withAddress(Address.from(clusterInfo.nodeAddress))
+            .withManagementGroup(raftPartitionGroup)
+            .withPartitionGroups(primaryBackupGroup)
+            .withMulticastEnabled()
+            .build()
+    }
+
+    fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> {
+        check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
+
+        val protocol = MultiPrimaryProtocol.builder()
+            .withBackups(2)
+            .build()
+
+        return atomix.mapBuilder<String, T>(storeName)
+            .withProtocol(protocol)
+            .withCacheEnabled()
+            .withValueType(JsonNode::class.java)
+            .withExtraTypes(
+                JsonNode::class.java, TextNode::class.java, ObjectNode::class.java,
+                ArrayNode::class.java, NullNode::class.java, MissingNode::class.java
+            )
+            .build()
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
new file mode 100644 (file)
index 0000000..919d671
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+
+import com.fasterxml.jackson.databind.JsonNode
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.async
+import kotlinx.coroutines.awaitAll
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.junit.Before
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
+import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import kotlin.test.assertNotNull
+
+class AtomixBluePrintClusterServiceTest {
+    val log = logger(AtomixBluePrintClusterServiceTest::class)
+
+    @Before
+    fun init() {
+        runBlocking {
+            deleteNBDir("target/cluster")
+        }
+    }
+
+    /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/
+    @Test
+    fun testClusterJoin() {
+        runBlocking {
+            val members = arrayListOf("node-5679", "node-5680")
+            val deferred = arrayListOf(5679, 5680).map { port ->
+                async(Dispatchers.IO) {
+                    val nodeId = "node-$port"
+                    log.info("********** Starting node($nodeId) on port($port)")
+                    val clusterInfo = ClusterInfo(
+                        id = "test-cluster", nodeId = nodeId,
+                        clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
+                    )
+                    val atomixClusterService = AtomixBluePrintClusterService()
+                    atomixClusterService.startCluster(clusterInfo)
+                    atomixClusterService.atomix
+                }
+            }
+            val atomix = deferred.awaitAll()
+            /** Test Distributed store creation */
+            repeat(2) { storeId ->
+                val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+                assertNotNull(store, "failed to get store")
+                val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+                store1.addListener {
+                    log.info("Received map event : $it")
+                }
+                repeat(10) {
+                    store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
+                }
+                delay(100)
+                store.close()
+            }
+        }
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
new file mode 100644 (file)
index 0000000..f1c625e
--- /dev/null
@@ -0,0 +1,36 @@
+<!--
+  ~ Copyright © 2017-2018 AT&T Intellectual Property.
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <!-- encoders are assigned the type
+             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.springframework.test" level="warn"/>
+    <logger name="org.springframework" level="warn"/>
+    <logger name="org.hibernate" level="info"/>
+    <logger name="io.atomix" level="warn"/>
+    <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+    <root level="warn">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>
index 30c34ab..78c5691 100755 (executable)
@@ -34,6 +34,7 @@
 
     <modules>
         <module>processor-core</module>
+        <module>atomix-lib</module>
         <module>db-lib</module>
         <module>rest-lib</module>
         <module>dmaap-lib</module>
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
new file mode 100644 (file)
index 0000000..bbaa427
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.service
+
+import java.time.Duration
+
+interface BluePrintClusterService {
+
+    /** Start the cluster with [clusterInfo], By default clustering service is disabled.
+     * Application module has to start cluster */
+    suspend fun startCluster(clusterInfo: ClusterInfo)
+
+    fun clusterJoined(): Boolean
+
+    /** Returns all the data cluster members */
+    suspend fun allMembers(): Set<ClusterMember>
+
+    /** Returns data cluster members starting with prefix */
+    suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember>
+
+    /** Create and get or get the distributed data map store with [name] */
+    suspend fun <T> clusterMapStore(name: String): MutableMap<String, T>
+
+    /** Shut down the cluster with [duration] */
+    suspend fun shutDown(duration: Duration)
+}
+
+data class ClusterInfo(
+    val id: String,
+    var configFile: String? = null,
+    val nodeId: String,
+    val nodeAddress: String,
+    var clusterMembers: List<String>,
+    var storagePath: String
+)
+
+data class ClusterMember(val id: String, val memberAddress: String?)
index b806397..091d9dc 100755 (executable)
@@ -25,6 +25,7 @@
 
     <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
     <artifactId>parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
     <packaging>pom</packaging>
 
     <name>Blueprints Processor Parent</name>
                 <version>${netty-ssl}</version>
             </dependency>
 
+            <!-- Atomix -->
+            <dependency>
+                <groupId>io.atomix</groupId>
+                <artifactId>atomix</artifactId>
+                <version>${atomix.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.atomix</groupId>
+                <artifactId>atomix-raft</artifactId>
+                <version>${atomix.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.atomix</groupId>
+                <artifactId>atomix-primary-backup</artifactId>
+                <version>${atomix.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.atomix</groupId>
+                <artifactId>atomix-gossip</artifactId>
+                <version>${atomix.version}</version>
+            </dependency>
+
             <!-- Adaptors -->
             <dependency>
                 <groupId>org.apache.sshd</groupId>
                 <artifactId>processor-core</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+                <artifactId>atomix-lib</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
                 <artifactId>db-lib</artifactId>