PNDA Telemetry app for virtual firwall use case 57/67757/9
authorchenxdu <chenxdu@cisco.com>
Wed, 19 Sep 2018 14:42:15 +0000 (16:42 +0200)
committerDonald Hunter <donaldh@cisco.com>
Mon, 1 Oct 2018 09:23:50 +0000 (10:23 +0100)
The telemetry app ingests virtual firewall VES events into
HDFS and the timeseries datastore to support futher analytics.

Change-Id: I3a0920d4b416c1c165311ab9ff0fc31d8c96499f
Signed-off-by: chenxdu <chenxdu@cisco.com>
Issue-ID: DCAEGEN2-632
Signed-off-by: Donald Hunter <donaldh@cisco.com>
74 files changed:
.gitignore [new file with mode: 0644]
pnda-ztt-app/.gitignore [new file with mode: 0644]
pnda-ztt-app/Makefile [new file with mode: 0644]
pnda-ztt-app/README.md [new file with mode: 0644]
pnda-ztt-app/assembly.sbt [new file with mode: 0644]
pnda-ztt-app/build.sbt [new file with mode: 0644]
pnda-ztt-app/pom.xml [new file with mode: 0644]
pnda-ztt-app/project/assembly.sbt [new file with mode: 0644]
pnda-ztt-app/project/build.properties [new file with mode: 0644]
pnda-ztt-app/project/plugins.sbt [new file with mode: 0644]
pnda-ztt-app/src/main/resources/dataplatform-raw.avsc [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/fib-summary.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/logging-stats.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/memory-detail.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/memory-summary.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/rib-oper.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/ves-nic.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala [new file with mode: 0644]
pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala [new file with mode: 0644]
pnda-ztt-app/src/test/resources/application.properties [new file with mode: 0644]
pnda-ztt-app/src/test/resources/log4j.testing.properties [new file with mode: 0644]
pnda-ztt-app/src/test/resources/meta/test-one.yaml [new file with mode: 0644]
pnda-ztt-app/src/test/resources/meta/test-three.yaml [new file with mode: 0644]
pnda-ztt-app/src/test/resources/meta/test-two.yaml [new file with mode: 0644]
pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala [new file with mode: 0644]
pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala [new file with mode: 0644]
pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala [new file with mode: 0644]
pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala [new file with mode: 0644]
pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala [new file with mode: 0644]
pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore [new file with mode: 0644]
pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties [new file with mode: 0644]
pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties [new file with mode: 0644]
pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json [new file with mode: 0644]
pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json [new file with mode: 0644]
pnda-ztt-app/test/application.properties [new file with mode: 0644]
pnda-ztt-app/test/log4j.testing.properties [new file with mode: 0644]
pom.xml [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..42f4a1a
--- /dev/null
@@ -0,0 +1,2 @@
+target/
+*~
diff --git a/pnda-ztt-app/.gitignore b/pnda-ztt-app/.gitignore
new file mode 100644 (file)
index 0000000..93ed9eb
--- /dev/null
@@ -0,0 +1,8 @@
+target/
+.project
+.classpath
+.settings/
+/bin/
+.cache-main
+.cache-tests
+PndaZTTApp.jar
diff --git a/pnda-ztt-app/Makefile b/pnda-ztt-app/Makefile
new file mode 100644 (file)
index 0000000..7c08bde
--- /dev/null
@@ -0,0 +1,40 @@
+SERVER=https://knox.example.com:8443/gateway/pnda/deployment
+APP=src/universal/sparkStreaming/PndaZTTApp/PndaZTTApp.jar
+PACKAGE=pnda-ztt-app-0.0.3.tar.gz
+
+app:           ## Build the application jar
+       sbt assembly
+
+package:       ## Build the deployable package
+       sbt 'universal:packageZipTarball'
+
+deploy:                ## Deploy the package to PNDA cluster
+       curl -k -u pnda:pnda -X PUT $(SERVER)/packages/$(PACKAGE) --data-binary @target/universal/$(PACKAGE) > /dev/null
+
+list:          ## Show the deployed packages
+       curl $(SERVER)/packages
+
+delete:                ## Delete the deployed package
+       curl -XDELETE $(SERVER)/packages/$(PACKAGE)
+
+test/PndaZTTApp.jar:   $(APP) test/application.properties
+       cp $< $@
+       jar uvf $@ -C test application.properties
+
+test:  test/PndaZTTApp.jar     ## Run the assembly with spark-submit
+       spark-submit \
+               --driver-java-options "-Dlog4j.configuration=file://$(PWD)/test/log4j.testing.properties" \
+               --class com.cisco.ztt.App \
+               --master 'local[4]' --deploy-mode client \
+               test/PndaZTTApp.jar
+
+clean:         ## Run sbt clean
+       sbt clean
+       rm -f $(APP)
+       rm -f test/PndaZTTApp.jar
+
+help:          ## This help
+       @awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST)
+
+.PHONY: help test
+.DEFAULT_GOAL := help
diff --git a/pnda-ztt-app/README.md b/pnda-ztt-app/README.md
new file mode 100644 (file)
index 0000000..9ab1d53
--- /dev/null
@@ -0,0 +1,13 @@
+# PNDA Zero Touch Telemetry
+
+## Overview
+
+The Zero Touch Telemetry application is a configurable telemetry-to-OpenTSDB solution.  Metadata
+files are used to configure the ZTT application for each telemetry source.
+
+The application receives telemetry events from Kafka, transforms the contents into a set of
+timeseries datapoints and writes them to OpenTSDB.
+
+This application demonstrates ingest of VES events from a virtual firewall into PNDA. The
+extracted metrics get stored in HDFS and the timeseries database. This enables direct
+visualization via Grafana as well as downstream Spark based analytics.
diff --git a/pnda-ztt-app/assembly.sbt b/pnda-ztt-app/assembly.sbt
new file mode 100644 (file)
index 0000000..7c9c71f
--- /dev/null
@@ -0,0 +1,27 @@
+import AssemblyKeys._
+
+assemblySettings
+
+jarName in assembly := "PndaZTTApp.jar"
+
+target in assembly:= file("src/universal/sparkStreaming/PndaZTTApp")
+
+mergeStrategy in assembly := {
+  case PathList("META-INF", "jboss-beans.xml") => MergeStrategy.first
+  case PathList("META-INF", "mailcap") => MergeStrategy.discard
+  case PathList("META-INF", "maven", "org.slf4j", "slf4j-api", xa @ _*) => MergeStrategy.rename
+  case PathList("META-INF", "ECLIPSEF.RSA") => MergeStrategy.discard
+  case PathList("META-INF", "mimetypes.default") => MergeStrategy.first
+  case PathList("com", "datastax", "driver", "core", "Driver.properties") => MergeStrategy.last
+  case PathList("com", "esotericsoftware", "minlog", xx @ _*) => MergeStrategy.first
+  case PathList("plugin.properties") => MergeStrategy.discard
+  case PathList("javax", "activation", xw @ _*) => MergeStrategy.first
+  case PathList("org", "apache", "hadoop", "yarn", xv @ _*) => MergeStrategy.first
+  case PathList("org", "apache", "commons", xz @ _*) => MergeStrategy.first
+  case PathList("org", "jboss", "netty", ya @ _*) => MergeStrategy.first
+  case PathList("org", "apache", "spark", ya @ _*) => MergeStrategy.first
+  case x => {
+    val oldStrategy = (mergeStrategy in assembly).value
+    oldStrategy(x)
+  }
+}
diff --git a/pnda-ztt-app/build.sbt b/pnda-ztt-app/build.sbt
new file mode 100644 (file)
index 0000000..5811d22
--- /dev/null
@@ -0,0 +1,34 @@
+name := "pnda-ztt-app"
+
+version := "0.0.3"
+
+scalaVersion := "2.10.6"
+
+enablePlugins(UniversalPlugin)
+
+packageZipTarball in Universal := {
+  val originalFileName = (packageZipTarball in Universal).value
+  val (base, ext) = originalFileName.baseAndExt
+  val newFileName = file(originalFileName.getParent) / (base + ".tar.gz")
+  IO.move(originalFileName, newFileName)
+  newFileName
+}
+
+libraryDependencies ++= Seq(
+    "org.apache.spark" %% "spark-core" % "1.6.0" % "provided",
+    "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided",
+    "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0" ,
+    "org.apache.kafka" %% "kafka" % "0.8.2.1",
+    "org.apache.avro" % "avro" % "1.7.7",
+    "joda-time" % "joda-time" % "2.8.1" % "provided",
+    "log4j" % "log4j" % "1.2.17" % "provided",
+    "org.apache.httpcomponents" % "httpcore" % "4.2.5" % "provided",
+    "org.apache.httpcomponents" % "httpclient" % "4.2.5" % "provided",
+    "com.fasterxml.jackson.core" % "jackson-databind" % "2.2.3" % "provided",
+    "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.2.3",
+    "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.2.3" % "provided",
+    "org.springframework" % "spring-core" % "4.3.10.RELEASE",
+    "org.scalatest" % "scalatest_2.10" % "3.0.1" % "test"
+)
+
+EclipseKeys.withSource := true
diff --git a/pnda-ztt-app/pom.xml b/pnda-ztt-app/pom.xml
new file mode 100644 (file)
index 0000000..c792a88
--- /dev/null
@@ -0,0 +1,72 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.onap.dcaegen2.analytics.pnda</groupId>
+  <artifactId>pnda-ztt-app</artifactId>
+  <packaging>pom</packaging>
+  <description>pnda-ztt-app</description>
+  <version>1.0.0-SNAPSHOT</version>
+  <name>pnda-ztt-app</name>
+  <organization>
+    <name>default</name>
+  </organization>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.scala-sbt</groupId>
+      <artifactId>sbt-launch</artifactId>
+      <version>1.0.0-M4</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.6.0</version>
+        <executions>
+          <execution>
+            <phase>compile</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <executable>java</executable>
+              <classpathScope>compile</classpathScope>
+              <arguments>
+                <argument>-classpath</argument>
+                <classpath/>
+                <argument>xsbt.boot.Boot</argument>
+                <argument>assembly</argument>
+                <argument>universal:packageZipTarball</argument>
+              </arguments>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>3.0.0</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>${project.build.directory}/universal/${project.name}-0.0.3.tar.gz</file>
+                  <type>tar.gz</type>
+                  <classifier>app</classifier>
+                </artifact>
+              </artifacts>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pnda-ztt-app/project/assembly.sbt b/pnda-ztt-app/project/assembly.sbt
new file mode 100644 (file)
index 0000000..54c3252
--- /dev/null
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
diff --git a/pnda-ztt-app/project/build.properties b/pnda-ztt-app/project/build.properties
new file mode 100644 (file)
index 0000000..62488aa
--- /dev/null
@@ -0,0 +1,3 @@
+sbt.version=0.13.16
+
+resolvers += "Artima Maven Repository" at "https://repo.artima.com/releases"
diff --git a/pnda-ztt-app/project/plugins.sbt b/pnda-ztt-app/project/plugins.sbt
new file mode 100644 (file)
index 0000000..b03dd6c
--- /dev/null
@@ -0,0 +1,3 @@
+addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.1")
+addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")
+addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.1.1")
diff --git a/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc b/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc
new file mode 100644 (file)
index 0000000..5450771
--- /dev/null
@@ -0,0 +1,10 @@
+{"namespace": "com.cisco.pnda",
+ "type": "record",
+ "name": "PndaRecord",
+ "fields": [
+     {"name": "timestamp",   "type": "long"},
+     {"name": "src",         "type": "string"},
+     {"name": "host_ip",     "type": "string"},
+     {"name": "rawdata",     "type": "bytes"}
+ ]
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml b/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml
new file mode 100644 (file)
index 0000000..5d56f77
--- /dev/null
@@ -0,0 +1,17 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: bgp
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-ipv4-bgp-oper:bgp/instances/instance/instance-active/default-vrf/afs/af/neighbor-af-table/neighbor
+    keys:
+      - name: af-name
+        display_name: "Address Family Name"
+      - name: instance-name
+        display_name: "Instance Name"
+      - name: neighbor-address
+        display_name: "Neighbor Address"
+    content:
+      - name: connection-up-count
+        display_name: "Connection Up Count"
diff --git a/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml b/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml
new file mode 100644 (file)
index 0000000..0ce8870
--- /dev/null
@@ -0,0 +1,19 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: cpu
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-wdsysmon-fd-oper:system-monitoring/cpu-utilization
+    keys:
+      - name: node-name
+        display_name: "Node Name"
+    content:
+      - name: total-cpu-one-minute
+        display_name: "One-minute CPU Total"
+
+      - name: total-cpu-five-minute
+        display_name: "Five-minute CPU Total"
+
+      - name: total-cpu-fifteen-minute
+        display_name: "Fifteen-minute CPU Total"
diff --git a/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml b/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml
new file mode 100644 (file)
index 0000000..2dbb4ae
--- /dev/null
@@ -0,0 +1,27 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: fib
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-fib-common-oper:fib/nodes/node/protocols/protocol/fib-summaries/fib-summary
+    keys:
+      - name: node-name
+        display_name: "Node Name"
+      - name: protocol-name
+        display_name: "Protocol Name"
+      - name: vrf-name
+        display_name: "VRF Name"
+    content:
+      - name: extended-prefixes
+        display_name: "Num Extended Prefixes"
+
+      - name: forwarding-elements
+        display_name: "Num Forwarding Elements"
+
+      - name: next-hops
+        display_name: "Num Next Hops"
+
+      - name: routes
+        display_name: "Num Routes"
diff --git a/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml b/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml
new file mode 100644 (file)
index 0000000..24fb808
--- /dev/null
@@ -0,0 +1,36 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: interface
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+    keys:
+      - name: interface-name
+        display_name: "Interface Name"
+    content:
+      - name: bytes-received
+        display_name: "Bytes Received"
+        ts_name: bytes-in                  # rename the metric in OpenTSDB
+
+      - name: bytes-sent
+        display_name: "Bytes Sent"
+        ts_name: bytes-out                 # rename the metric in OpenTSDB
+
+      - name: packets-received
+        display_name: "Packets Received"
+
+      - name: packets-sent
+        display_name: "Packets Sent"
+
+      - name: broadcast-packets-received
+        display_name: "Broadcast Packets Received"
+
+      - name: broadcast-packets-sent
+        display_name: "Broadcast Packets Sent"
+
+      - name: multicast-packets-received
+        display_name: "Multicast Packets Received"
+
+      - name: multicast-packets-sent
+        display_name: "Multicast Packets Sent"
diff --git a/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml b/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml
new file mode 100644 (file)
index 0000000..3bc9689
--- /dev/null
@@ -0,0 +1,27 @@
+input_topic: telemetry.avro
+processor: inventory
+output_topic: inventory
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-invmgr-oper:inventory/entities/entity/attributes/inv-basic-bag
+    keys:
+      - name: name
+        display_name: "Entity Name"
+    content:
+      - name: serial-number
+        display_name: "Serial Number"
+
+      - name: description
+        display_name: "Description"
+
+      - name: manufacturer-name
+        display_name: "Manufacturer"
+
+      - name: model-name
+        display_name: "Model Name"
+
+      - name: software-revision
+        display_name: "Software Revision"
+
+      - name: vendor-type
+        display_name: "Vendor OID"
diff --git a/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml b/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml
new file mode 100644 (file)
index 0000000..db6386d
--- /dev/null
@@ -0,0 +1,27 @@
+input_topic: telemetry.avro
+processor: inventory
+output_topic: inventory
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-invmgr-oper:inventory/racks/rack/attributes/inv-basic-bag
+    keys:
+      - name: name
+        display_name: "Entity Name"
+    content:
+      - name: serial-number
+        display_name: "Serial Number"
+
+      - name: description
+        display_name: "Description"
+
+      - name: manufacturer-name
+        display_name: "Manufacturer"
+
+      - name: model-name
+        display_name: "Model Name"
+
+      - name: software-revision
+        display_name: "Software Revision"
+
+      - name: vendor-type
+        display_name: "Vendor OID"
diff --git a/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml b/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml
new file mode 100644 (file)
index 0000000..3a5dcba
--- /dev/null
@@ -0,0 +1,16 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: ipv6
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic
+    keys:
+      - name: node-name
+        display_name: "Node Name"
+    content:
+      - name: ipv6.total-packets
+        display_name: "Total IPV6 Packets"
+
+      - name: icmp.total-messages
+        display_name: "Total ICMP Messages"
diff --git a/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml b/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml
new file mode 100644 (file)
index 0000000..2360de6
--- /dev/null
@@ -0,0 +1,58 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: inventory
+output_topic: inventory
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/summaries/summary
+    keys:
+      - name: device-id
+        display_name: "Device Id"
+      - name: interface-name
+        display_name: "Interface Name"
+      - name: node-name
+        display_name: "Node Name"
+    content:
+      - name: chassis-id
+        display_name: "Chassis Id"
+
+      - name: device-id
+        display_name: "Device Id"
+
+      - name: port-id-detail
+        display_name: "Port Id"
+
+      - name: receiving-interface-name
+        display_name: "Receiving Interface Name"
+
+      - name: enabled-capabilities
+        display_name: "Enabled Capabilities"
+  - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device
+    keys:
+      - name: device-id
+        display_name: "Device Id"
+      - name: interface-name
+        display_name: "Interface Name"
+      - name: node-name
+        display_name: "Node Name"
+    content:
+      - name: lldp-neighbor.chassis-id
+        display_name: "Chassis Id"
+  - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/details/detail
+    keys:
+      - name: device-id
+        display_name: "Device Id"
+      - name: interface-name
+        display_name: "Interface Name"
+      - name: node-name
+        display_name: "Node Name"
+    content:
+      - name: lldp-neighbor.chassis-id
+        display_name: "Chassis Id"
+  - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/interfaces/interface
+    keys:
+      - name: interface-name
+      - name: node-name
+    content:
+      - name: interface-name
+      - name: if-index
diff --git a/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml b/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml
new file mode 100644 (file)
index 0000000..e29e32c
--- /dev/null
@@ -0,0 +1,23 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: lldp.stats
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/statistics
+    keys:
+      - name: node-name
+        display_name: "Node Name"
+    content:
+      - name: received-packets
+      - name: transmitted-packets
+      - name: aged-out-entries
+      - name: bad-packets
+      - name: discarded-packets
+      - name: discarded-tl-vs
+      - name: encapsulation-errors
+      - name: out-of-memory-errors
+      - name: queue-overflow-errors
+      - name: table-overflow-errors
+      - name: unrecognized-tl-vs
diff --git a/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml b/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml
new file mode 100644 (file)
index 0000000..5d12152
--- /dev/null
@@ -0,0 +1,11 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: logging
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-infra-syslog-oper:syslog/logging-statistics
+    content:
+      - name: buffer-logging-stats.message-count
+        ts_name: message-count
+        display_name: "Serial Number"
diff --git a/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml b/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml
new file mode 100644 (file)
index 0000000..6b3f657
--- /dev/null
@@ -0,0 +1,31 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: memory
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-nto-misc-oper:memory-summary/nodes/node/detail
+    keys:
+      - name: node-name
+        display_name: "Node Name"
+    content:
+      - name: allocated-memory
+        display_name: "Allocated Memory"
+
+      - name: free-application-memory
+        display_name: "Free Application Memory"
+
+      - name: free-physical-memory
+        display_name: "Free Physical Memory"
+
+      - name: ram-memory
+        display_name: "RAM Memory"
+
+      - name: program-data
+        display_name: "Program Data"
+
+      - name: program-stack
+        display_name: "Program Stack"
+
+      - name: program-text
+        display_name: "Program Text"
diff --git a/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml b/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml
new file mode 100644 (file)
index 0000000..02adef6
--- /dev/null
@@ -0,0 +1,25 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: memory.summary
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-nto-misc-oper:memory-summary/nodes/node/summary
+    keys:
+      - name: node-name
+        display_name: "Node Name"
+    content:
+      - name: allocated-memory
+        display_name: "Allocated Memory"
+
+      - name: free-application-memory
+        display_name: "Free Application Memory"
+
+      - name: free-physical-memory
+        display_name: "Free Physical Memory"
+
+      - name: ram-memory
+        display_name: "RAM Memory"
+
+      - name: system-ram-memory
+        display_name: "System RAM Memopry"
diff --git a/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml b/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml
new file mode 100644 (file)
index 0000000..197b0b9
--- /dev/null
@@ -0,0 +1,27 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: rib
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-ip-rib-ipv4-oper:rib/vrfs/vrf/afs/af/safs/saf/ip-rib-route-table-names/ip-rib-route-table-name/protocol/bgp/as/information
+    keys:
+      - name: af-name
+        display_name: "Address Family Name"
+      - name: as
+        display_name: "Address Family"
+      - name: route-table-name
+        display_name: "Route table name"
+      - name: saf-name
+        display_name: "Saf name"
+      - name: vrf-name
+        display_name: "Vrf name"
+    content:
+      - name: active-routes-count
+        display_name: "Active Routes Count"
+      - name: instance
+        display_name: "Instance"
+      - name: paths-count
+        display_name: "Paths Count"
+      - name: routes-counts
+        display_name: "Routes Count"
diff --git a/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml b/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml
new file mode 100644 (file)
index 0000000..aaf1de8
--- /dev/null
@@ -0,0 +1,12 @@
+format: ves
+input_topic: ves.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: cpu
+
+ves_telemetry:
+  - path: measurementsForVfScalingFields/cpuUsageArray
+    keys:
+      - name: cpuIdentifier
+    content:
+      - name: percentUsage
diff --git a/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml b/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml
new file mode 100644 (file)
index 0000000..a6ae3de
--- /dev/null
@@ -0,0 +1,12 @@
+format: ves
+input_topic: ves.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: nic
+
+ves_telemetry:
+  - path: measurementsForVfScalingFields/vNicUsageArray
+    keys:
+      - name: vNicIdentifier
+    content:
+      - name: receivedTotalPacketsDelta
diff --git a/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml b/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml
new file mode 100644 (file)
index 0000000..df466c3
--- /dev/null
@@ -0,0 +1,27 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: fib.vrf
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-fib-common-oper:fib/nodes/node/protocols/protocol/vrfs/vrf/summary
+    keys:
+      - name: node-name
+        display_name: "Node Name"
+      - name: protocol-name
+        display_name: "Protocol Name"
+      - name: vrf-name
+        display_name: "VRF Name"
+    content:
+      - name: extended-prefixes
+        display_name: "Num Extended Prefixes"
+
+      - name: forwarding-elements
+        display_name: "Num Forwarding Elements"
+
+      - name: next-hops
+        display_name: "Num Next Hops"
+
+      - name: routes
+        display_name: "Num Routes"
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala
new file mode 100644 (file)
index 0000000..b143b62
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+  * Name:       StatReporter
+  * Purpose:    Report batch processing metrics to the PNDA metric logger
+  * Author:     PNDA team
+  *
+  * Created:    07/04/2016
+  */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.pnda
+
+import scala.util.control.NonFatal
+import java.io.StringWriter
+import java.io.PrintWriter
+import org.apache.spark.streaming.scheduler.StreamingListener
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted
+import org.apache.log4j.Logger
+import org.apache.http.client.methods.HttpPost
+import org.apache.http.entity.StringEntity
+import org.apache.http.impl.client.DefaultHttpClient
+
+class StatReporter(appName: String, metricsUrl: String) extends StreamingListener {
+
+    private[this] val logger = Logger.getLogger(getClass().getName())
+
+    override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = {
+        def doSend(metricName: String, metricValue: String) = {
+            try {
+                val httpClient = new DefaultHttpClient()
+                val post = new HttpPost(metricsUrl)
+                post.setHeader("Content-type", "application/json")
+                val ts = java.lang.System.currentTimeMillis()
+                val body = f"""{
+                    |    "data": [{
+                    |        "source": "application.$appName",
+                    |        "metric": "application.kpi.$appName.$metricName",
+                    |        "value": "$metricValue",
+                    |        "timestamp": $ts%d
+                    |    }],
+                    |    "timestamp": $ts%d
+                    |}""".stripMargin
+
+                logger.debug(body)
+                post.setEntity(new StringEntity(body))
+                val response = httpClient.execute(post)
+                if (response.getStatusLine.getStatusCode() != 200) {
+                    logger.error("POST failed: " + metricsUrl + " response:" + response.getStatusLine.getStatusCode())
+                }
+
+            } catch {
+                case NonFatal(t) => {
+                    logger.error("POST failed: " + metricsUrl)
+                    val sw = new StringWriter
+                    t.printStackTrace(new PrintWriter(sw))
+                    logger.error(sw.toString)
+                }
+            }
+        }
+        doSend("processing-delay", batch.batchInfo.processingDelay.get.toString())
+        doSend("scheduling-delay", batch.batchInfo.schedulingDelay.get.toString())
+        doSend("num-records", batch.batchInfo.numRecords.toString())
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java
new file mode 100644 (file)
index 0000000..1a08bd4
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+  * Name:       DataPlatformEvent
+  * Purpose:    Data model class for an avro event on Kafka
+  * Author:     PNDA team
+  *
+  * Created:    07/04/2016
+  */
+
+package com.cisco.pnda.model;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class DataPlatformEvent implements Serializable
+{
+    private static final long serialVersionUID = 1L;
+    protected static ObjectMapper _mapper = new ObjectMapper();
+
+    private String _src;
+    private Long _timestamp;
+    private String _hostIp;
+    private String _rawdata;
+
+    public DataPlatformEvent(String src, Long timestamp, String host_ip, String rawdata)
+    {
+        _src = src;
+        _timestamp = timestamp;
+        _hostIp = host_ip;
+        _rawdata = rawdata;
+    }
+
+    public String getSrc()
+    {
+        return _src;
+    }
+
+    public Long getTimestamp()
+    {
+        return _timestamp;
+    }
+
+    public String getHostIp()
+    {
+        return _hostIp;
+    }
+
+    public String getRawdata()
+    {
+        return _rawdata;
+    }
+
+    @Override
+    public String toString()
+    {
+        try
+        {
+            return _mapper.writeValueAsString(this);
+        }
+        catch (Exception ex)
+        {
+            return null;
+        }
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        boolean result = false;
+        if (other instanceof DataPlatformEvent)
+        {
+            DataPlatformEvent that = (DataPlatformEvent) other;
+            result =   (this.getSrc()       == that.getSrc()       || (this.getSrc()       != null && this.getSrc().equals(that.getSrc())))
+                    && (this.getTimestamp() == that.getTimestamp() || (this.getTimestamp() != null && this.getTimestamp().equals(that.getTimestamp())))
+                    && (this.getHostIp()    == that.getHostIp()    || (this.getHostIp()    != null && this.getHostIp().equals(that.getHostIp())))
+                    && (this.getRawdata()   == that.getRawdata()   || (this.getRawdata()  != null && this.getRawdata().equals(that.getRawdata())));
+        }
+        return result;
+
+    }
+
+    public JsonNode RawdataAsJsonObj() throws JsonProcessingException, IOException
+    {
+        return _mapper.readTree(_rawdata);
+    }
+
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java
new file mode 100644 (file)
index 0000000..ef2bc0d
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+  * Name:       DataPlatformEventDecoder
+  * Purpose:    Encodes and secodes binary event data from kafka to/from a DataPlatformEvent object
+  * Author:     PNDA team
+  *
+  * Created:    07/04/2016
+  */
+
+package com.cisco.pnda.model;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.log4j.Logger;
+
+
+public class DataPlatformEventCodec
+{
+    private static final Logger LOGGER = Logger.getLogger(DataPlatformEventCodec.class.getName());
+
+    private Schema.Parser _parser = new Schema.Parser();
+    private DatumReader<GenericRecord> _reader;
+    private DatumWriter<GenericRecord> _writer;
+    private Schema _schema;
+    private String _schemaDef;
+
+    public DataPlatformEventCodec(String schemaDef) throws IOException
+    {
+        _schemaDef = schemaDef;
+        _schema = _parser.parse(schemaDef);
+        _reader = new GenericDatumReader<GenericRecord>(_schema);
+        _writer = new GenericDatumWriter<GenericRecord>(_schema);
+    }
+
+    public DataPlatformEvent decode(byte[] data) throws IOException
+    {
+        try
+        {
+        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+        GenericRecord r = _reader.read(null, decoder);
+        return new DataPlatformEvent((String) r.get("src").toString(),
+                (Long) r.get("timestamp"),
+                (String) r.get("host_ip").toString(),
+                new String(((ByteBuffer)r.get("rawdata")).array()));
+        }
+        catch(Exception ex)
+        {
+            LOGGER.error("data:" + hexStr(data) + " schema: " + _schemaDef, ex);
+            throw new IOException(ex);
+        }
+    }
+
+    final protected static char[] hexArray = "0123456789ABCDEF".toCharArray();
+    public static String hexStr(byte[] bytes) {
+        char[] hexChars = new char[bytes.length * 2];
+        for ( int j = 0; j < bytes.length; j++ ) {
+            int v = bytes[j] & 0xFF;
+            hexChars[j * 2] = hexArray[v >>> 4];
+            hexChars[j * 2 + 1] = hexArray[v & 0x0F];
+        }
+        return new String(hexChars);
+    }
+
+    public byte[] encode(DataPlatformEvent e) throws IOException
+    {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+        GenericRecord datum = new GenericData.Record(_schema);
+        datum.put("src", e.getSrc());
+        datum.put("timestamp", e.getTimestamp());
+        datum.put("host_ip", e.getHostIp());
+        datum.put("rawdata", ByteBuffer.wrap(e.getRawdata().getBytes("UTF-8")));
+        _writer.write(datum, encoder);
+        encoder.flush();
+        out.close();
+        return out.toByteArray();
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java
new file mode 100644 (file)
index 0000000..24a9d35
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+  * Name:       StaticHelpers
+  * Purpose:    Helper functions
+  * Author:     PNDA team
+  *
+  * Created:    07/04/2016
+  */
+
+package com.cisco.pnda.model;
+
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+public class StaticHelpers {
+       public static String loadResourceFile(String path)
+    {
+        InputStream is = StaticHelpers.class.getClassLoader().getResourceAsStream(path);
+        try
+        {
+            char[] buf = new char[2048];
+            Reader r = new InputStreamReader(is, "UTF-8");
+            StringBuilder s = new StringBuilder();
+            while (true)
+            {
+                int n = r.read(buf);
+                if (n < 0)
+                    break;
+                s.append(buf, 0, n);
+            }
+            return s.toString();
+        }
+        catch (Exception e)
+        {
+            return null;
+        }
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala
new file mode 100644 (file)
index 0000000..6bc2083
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+import org.apache.spark.streaming.StreamingContext
+import com.cisco.pnda.StatReporter
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.YamlReader
+import org.apache.log4j.BasicConfigurator
+
+object App {
+
+    private[this] val logger = Logger.getLogger(getClass().getName())
+
+    def main(args: Array[String]) {
+
+        BasicConfigurator.configure();
+
+        val props = AppConfig.loadProperties();
+        val loggerUrl = props.getProperty("environment.metric_logger_url")
+        val appName = props.getProperty("component.application")
+        val checkpointDirectory = props.getProperty("app.checkpoint_path");
+        val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));
+
+        val metadata = YamlReader.load()
+        if (metadata.units.length == 0) {
+            logger.error("Trying to run app without metadata")
+            System.exit(1)
+        }
+        val pipeline = new ZttPipeline(metadata)
+
+        // Create the streaming context, or load a saved one from disk
+        val ssc = if (checkpointDirectory.length() > 0)
+            StreamingContext.getOrCreate(checkpointDirectory, pipeline.create) else pipeline.create();
+
+        sys.ShutdownHookThread {
+            logger.info("Gracefully stopping Spark Streaming Application")
+            ssc.stop(true, true)
+            logger.info("Application stopped")
+        }
+
+        if (loggerUrl != null) {
+            logger.info("Reporting stats to url: " + loggerUrl)
+            ssc.addStreamingListener(new StatReporter(appName, loggerUrl))
+        }
+        logger.info("Starting spark streaming execution")
+        ssc.start()
+        ssc.awaitTermination()
+    }
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java
new file mode 100644 (file)
index 0000000..a711cab
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+  * Name:       AppConfig
+  * Purpose:    Load application properties file.
+  * Author:     PNDA team
+  *
+  * Created:    07/04/2016
+  */
+
+package com.cisco.ztt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+public class AppConfig
+{
+    private static final Logger LOGGER = Logger.getLogger(AppConfig.class);
+
+    private static Properties _properties;
+    private static Object _lock = new Object();
+
+    public static Properties loadProperties()
+    {
+        synchronized (_lock)
+        {
+            if (_properties == null)
+            {
+                _properties = new Properties();
+                try
+                {
+                    InputStream is = AppConfig.class.getClassLoader().getResourceAsStream("application.properties");
+                    _properties.load(is);
+                    is.close();
+                    LOGGER.info("Properties loaded");
+                }
+                catch (IOException e)
+                {
+                    LOGGER.info("Failed to load properties", e);
+                    System.exit(1);
+                }
+            }
+            return _properties;
+        }
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
new file mode 100644 (file)
index 0000000..c0bc61e
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name:       KafkaInput
+ * Purpose:    Generate a dstream from Kafka
+ * Author:     PNDA team
+ *
+ * Created:    07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.kafka.KafkaUtils
+
+import com.cisco.pnda.model.DataPlatformEventCodec
+import com.cisco.pnda.model.StaticHelpers
+
+import kafka.serializer.DefaultDecoder
+import kafka.serializer.StringDecoder
+import org.apache.log4j.Logger
+
+class KafkaInput extends Serializable {
+
+    object Holder extends Serializable {
+        @transient lazy val logger = Logger.getLogger(getClass.getName)
+    }
+
+    def readFromKafka(ssc: StreamingContext, topic: String) = {
+        val props = AppConfig.loadProperties();
+        val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers"))
+        if (props.getProperty("kafka.consume_from_beginning").toBoolean) {
+            kafkaParams.put("auto.offset.reset", "smallest");
+        }
+
+        Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list"))
+        Holder.logger.info("Registering with kafka using topic " + topic)
+
+        val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
+            ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));
+
+        // Decode avro container format
+        val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
+        val rawMessages = messages.map(x => {
+            val eventDecoder = new DataPlatformEventCodec(avroSchemaString);
+            val payload = x._2;
+            val dataPlatformEvent = eventDecoder.decode(payload);
+            dataPlatformEvent;
+        });
+        rawMessages;
+    };
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala
new file mode 100644 (file)
index 0000000..1041d00
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+
+
+
+
+import java.io.StringWriter
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.apache.log4j.Logger
+import org.apache.spark.streaming.dstream.DStream
+
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.pnda.model.DataPlatformEventCodec
+import com.cisco.pnda.model.StaticHelpers
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.github.benfradet.spark.kafka.writer.dStreamToKafkaWriter
+
+
+class KafkaOutput extends Serializable {
+
+    object Holder extends Serializable {
+        @transient lazy val logger = Logger.getLogger(getClass.getName)
+    }
+
+    def writeToKafka(output: DStream[Payload]) = {
+
+        val props = AppConfig.loadProperties();
+        val producerConfig = Map(
+            "bootstrap.servers" -> props.getProperty("kafka.brokers"),
+            "key.serializer" -> classOf[StringSerializer].getName,
+            "value.serializer" -> classOf[ByteArraySerializer].getName
+)
+        output.writeToKafka(
+            producerConfig,
+            s => {
+                val mapper = new ObjectMapper()
+                mapper.registerModule(DefaultScalaModule)
+
+                val out = new StringWriter
+                mapper.writeValue(out, s.datapoint)
+                val json = out.toString()
+
+                val event = new DataPlatformEvent(s.publishSrc, s.timestamp, s.hostIp, json)
+                val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
+                val codec = new DataPlatformEventCodec(avroSchemaString);
+
+                new ProducerRecord[String, Array[Byte]](s.publishTopic, codec.encode(event))
+            }
+        )
+    }
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala
new file mode 100644 (file)
index 0000000..9077986
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name:       OpenTSDBOutput
+ * Purpose:    Write a dstream to OpenTSDB
+ * Author:     PNDA team
+ *
+ * Created:    07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import java.io.StringWriter
+
+import scala.Iterator
+
+import org.apache.http.client.methods.HttpPost
+import org.apache.http.entity.StringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.http.impl.client.DefaultHttpClient
+import org.apache.log4j.Logger
+import org.apache.spark.streaming.dstream.DStream
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import scala.collection.mutable.ArrayBuffer
+
+class OpenTSDBOutput extends Serializable {
+
+    object Holder extends Serializable {
+        @transient lazy val logger = Logger.getLogger(getClass.getName)
+    }
+
+    def putOpentsdb[T](opentsdbIP: String,
+                       input: DStream[Payload]): DStream[Payload] = {
+        input.mapPartitions(partition => {
+            var size = 0
+            val output = partition.grouped(20).flatMap(group => {
+                val data = ArrayBuffer[TimeseriesDatapoint]()
+                val passthru = group.map(item => {
+                    data += item.datapoint
+                    item
+                })
+
+                size += data.length
+
+                if (data.length > 0) {
+                    val mapper = new ObjectMapper()
+                    mapper.registerModule(DefaultScalaModule)
+
+                    val out = new StringWriter
+                    mapper.writeValue(out, data)
+                    val json = out.toString()
+
+                    Holder.logger.debug("Posting " + data.length + " datapoints to OpenTSDB")
+                    Holder.logger.debug(json)
+
+                    if (opentsdbIP != null && opentsdbIP.length() > 0) {
+                        val openTSDBUrl = "http://" + opentsdbIP + "/api/put"
+                        try {
+                            val httpClient = new DefaultHttpClient()
+                            val post = new HttpPost(openTSDBUrl)
+                            post.setHeader("Content-type", "application/json")
+                            post.setEntity(new StringEntity(json))
+                            val response = httpClient.execute(post)
+                            // Holder.logger.debug(EntityUtils.toString(response.getEntity()))
+                        } catch {
+                            case t: Throwable => {
+                                Holder.logger.warn(t)
+                            }
+                        }
+                    }
+                } else {
+                    Holder.logger.debug("No datapoints to post to OpenTSDB")
+                }
+                passthru
+            })
+            output
+        });
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala
new file mode 100644 (file)
index 0000000..2961bd0
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+class Payload(
+        val publishSrc: String,
+        val publishTopic: String,
+        val hostIp: String,
+        val timestamp: Long,
+        val datapoint: TimeseriesDatapoint) {
+
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala
new file mode 100644 (file)
index 0000000..7e447bf
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+class TimeseriesDatapoint(
+        val metric: String,
+        val value: String,
+        val timestamp: String,
+        val tags: Map[String, String])  {
+
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala
new file mode 100644 (file)
index 0000000..aae1e8c
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+
+import com.cisco.ztt.meta.Metadata
+import com.cisco.ztt.cisco.xr.telemetry.TelemetryDispatcher
+import com.cisco.ztt.ves.telemetry.VesTransformer
+
+class TransformManager(metadata: Metadata) {
+
+    val transformers: Array[Transformer] = metadata.units.map( unit => {
+
+        unit.format match {
+            case "ves" => new VesTransformer(unit)
+            case "cisco.xr.telemetry" => new TelemetryDispatcher(unit)
+            case _ => new TelemetryDispatcher(unit)
+        }
+    })
+
+    val byTopic = transformers.groupBy( t => { t.inputTopic })
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala
new file mode 100644 (file)
index 0000000..861eb5e
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+import com.cisco.pnda.model.DataPlatformEvent
+import org.apache.spark.streaming.dstream.DStream
+
+trait Transformer {
+    def inputTopic: String
+    def transform(event: DataPlatformEvent): (Boolean, Set[Payload])
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala
new file mode 100644 (file)
index 0000000..c3b3532
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name:       KafkaPipeline
+ * Purpose:    Set up the spark streaming processing graph.
+ * Author:     PNDA team
+ *
+ * Created:    07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import org.apache.log4j.Logger
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+import com.cisco.ztt.meta.Metadata
+import com.cisco.pnda.model.DataPlatformEvent
+
+class ZttPipeline(metadata: Metadata) extends Serializable {
+
+    object Holder extends Serializable {
+        @transient lazy val logger = Logger.getLogger(getClass.getName)
+    }
+
+    def create() = {
+        val props = AppConfig.loadProperties();
+        val checkpointDirectory = props.getProperty("app.checkpoint_path");
+        val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));
+
+        val sparkConf = new SparkConf();
+        Holder.logger.info("Creating new spark context with checkpoint directory: " + checkpointDirectory)
+        val ssc = new StreamingContext(sparkConf, Seconds(batchSizeSeconds));
+
+        if (checkpointDirectory.length() > 0) {
+            ssc.checkpoint(checkpointDirectory)
+        }
+
+        val transformManager = new TransformManager(metadata)
+        val streams = transformManager.byTopic.map( x => {
+            val topic = x._1
+            val transformers = x._2
+
+            val inputStream = new KafkaInput().readFromKafka(ssc, topic)
+
+            val timeseriesStream = inputStream.flatMap(dataPlatformEvent => {
+                var handled = false;
+                val datapoints = transformers.flatMap(transformer => {
+                    val (ran, data) = transformer.transform(dataPlatformEvent)
+                    handled |= ran;
+                    data;
+                })
+                if (!handled) {
+                    Holder.logger.info("Did not process " + dataPlatformEvent.getRawdata)
+                }
+                datapoints
+            })
+
+            val outputStream =
+                new OpenTSDBOutput().putOpentsdb(
+                    props.getProperty("opentsdb.ip"),
+                    timeseriesStream);
+
+            new KafkaOutput().writeToKafka(outputStream)
+        });
+
+        ssc;
+    }: StreamingContext
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala
new file mode 100644 (file)
index 0000000..00470ff
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.Telemetry
+
+class InventoryMapper(config: Telemetry) extends Mapper with Serializable {
+
+    object Holder extends Serializable {
+        @transient lazy val logger = Logger.getLogger(getClass.getName)
+    }
+
+    def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+        Holder.logger.debug("InventoryMapper is sinking " + config.path)
+        Set[TimeseriesDatapoint]()
+    }
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala
new file mode 100644 (file)
index 0000000..97f4da3
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST.JArray
+import org.json4s.JsonAST.JInt
+import org.json4s.JsonAST.JValue
+import org.json4s.jackson.JsonMethods
+import org.json4s.jvalue2extractable
+import org.json4s.string2JsonInput
+import scala.reflect.ManifestFactory.arrayType
+import scala.reflect.ManifestFactory.classType
+import org.json4s.JsonAST.JObject
+
+
+case class TRow(Timestamp: String, Keys: Option[Map[String, String]], Content: Map[String, JValue])
+case class THeader(node_id_str: String, subscription_id_str: String,
+                encoding_path: String, collection_id: Int, collection_start_time: Int,
+                msg_timestamp: Int, collection_end_time: Int)
+case class TEvent(Source: String, Telemetry: THeader, Rows: Array[TRow])
+
+
+object JsonParser {
+
+    def parse(json: String): TEvent = {
+        implicit val formats = DefaultFormats
+
+        val parsed = JsonMethods.parse(json)
+        val event = parsed.extract[TEvent]
+        event
+    }
+
+    def array(value: JValue): Array[Map[String,JValue]] = {
+        implicit val formats = DefaultFormats
+        val array = value.asInstanceOf[JArray]
+        array.extract[Array[Map[String, JValue]]]
+    }
+
+    def map(value: JValue): Map[String,JValue] = {
+        implicit val formats = DefaultFormats
+        val map = value.asInstanceOf[JObject]
+        map.extract[Map[String, JValue]]
+    }
+
+    def int(value: JValue): String = {
+        value.asInstanceOf[JInt].num.toString
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala
new file mode 100644 (file)
index 0000000..2983e88
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+
+trait Mapper {
+    def transform(event: TEvent): Set[TimeseriesDatapoint]
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala
new file mode 100644 (file)
index 0000000..9f9c775
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+import org.apache.log4j.Logger
+
+
+class NullMapper(path: String) extends Mapper with Serializable {
+
+    object Holder extends Serializable {
+        @transient lazy val logger = Logger.getLogger(getClass.getName)
+    }
+
+    def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+        Holder.logger.debug("NullMapper is sinking " + path)
+        Set[TimeseriesDatapoint]()
+    }
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
new file mode 100644 (file)
index 0000000..b7deadc
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.apache.log4j.Logger
+
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.TimeseriesDatapoint
+import com.cisco.ztt.Transformer
+import com.cisco.ztt.meta.Unit
+import com.cisco.ztt.Payload
+
+
+class TelemetryDispatcher(unit: Unit) extends Serializable with Transformer {
+
+    object Holder extends Serializable {
+        @transient lazy val logger = Logger.getLogger(getClass.getName)
+    }
+
+    def inputTopic: String = { unit.input_topic }
+
+    val transformers = unit.xr_telemetry.map( d => {
+        Holder.logger.warn("Choosing mapper for " + unit.processor)
+        unit.processor match {
+            case "timeseries" => d.path -> new TimeseriesMapper(d, unit.timeseries_namespace)
+            case "inventory" => d.path -> new InventoryMapper(d)
+            case _ => d.path -> new NullMapper(d.path)
+        }
+    }).toMap
+
+    def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = {
+        Holder.logger.trace(rawEvent.getRawdata())
+
+        try {
+            val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src }
+            val event = JsonParser.parse(rawEvent.getRawdata())
+            val path = event.Telemetry.encoding_path
+            if (transformers.contains(path)) {
+                Holder.logger.debug("Transforming for " + path)
+                val datapoints = transformers(path).transform(event)
+                val payloads = datapoints.map(d => {
+                    new Payload(source, unit.output_topic,
+                            rawEvent.getHostIp, rawEvent.getTimestamp, d)
+                })
+                (true, payloads)
+            } else {
+                Holder.logger.trace("No transformer in unit for " + path)
+                (false, Set[Payload]())
+            }
+        } catch {
+            case t: Throwable => {
+                Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage)
+                (false, Set[Payload]())
+            }
+        }
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala
new file mode 100644 (file)
index 0000000..6c9ad80
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.apache.log4j.Logger
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JObject
+import org.json4s.JsonAST.JValue
+import com.cisco.ztt.meta.Item
+import com.cisco.ztt.meta.Telemetry
+
+class TimeseriesMapper(config: Telemetry, namespace: String) extends Mapper with Serializable {
+
+    object Holder extends Serializable {
+        @transient lazy val logger = Logger.getLogger(getClass.getName)
+    }
+
+    val wantedKeys = config.keys.getOrElse(Array[Item]()).map( item => {
+        item.name -> item
+    }).toMap
+
+    val wantedValues = config.content.map( item => {
+        item.name -> item
+    }).toMap
+
+    def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+
+        val timeseries = event.Rows.flatMap( row => {
+            val keys = row.Keys
+                    .getOrElse(Map[String, String]())
+                    .filter(k => wantedKeys.contains(k._1))
+                    .map( k => {
+                        val wanted = wantedKeys(k._1)
+                        val name = if (wanted.ts_name != null) wanted.ts_name else k._1
+                        name -> k._2
+                    }).toMap + ("host" -> event.Telemetry.node_id_str)
+
+            // Flatten nested maps into container.key -> value
+            val expanded = row.Content.flatMap( v => {
+                if (v._2.isInstanceOf[JObject]) {
+                    JsonParser.map(v._2).map( kv => {
+                        (v._1 + "." + kv._1) -> kv._2
+                    })
+                } else {
+                    Map[String, JValue](v)
+                }
+            })
+
+            val items = expanded.filter(v => wantedValues.contains(v._1))
+                .map( data => {
+                    val wanted = wantedValues(data._1)
+                    val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else data._1)
+                    val value = JsonParser.int(data._2)
+                    val datapoint = new TimeseriesDatapoint(name, value, row.Timestamp, keys)
+                    datapoint
+                })
+            items.toSet
+        })
+
+        timeseries.toSet
+    }
+
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala
new file mode 100644 (file)
index 0000000..84f5bbb
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+class Metadata(val units: Array[Unit]) extends Serializable {
+    def topics() : Set[String] = {
+        val topics = units.map { case (unit : Unit) => {
+            unit.input_topic
+        }}
+        
+        topics.toSet
+    }
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala
new file mode 100644 (file)
index 0000000..4b72cb1
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+import org.apache.log4j.Logger
+import org.springframework.core.io.support.PathMatchingResourcePatternResolver
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+case class Unit(
+        format: String,
+        input_topic: String,
+        processor: String,
+        output_topic: String,
+        publish_src: String,
+        timeseries_namespace: String,
+        xr_telemetry: Array[Telemetry],
+        ves_telemetry: Array[Telemetry])
+case class Telemetry(path: String, keys: Option[Array[Item]], content: Array[Item])
+case class Item(name: String, display_name: String, ts_name: String)
+
+object YamlReader {
+
+    private[this] val logger = Logger.getLogger(getClass().getName());
+
+    val mapper: ObjectMapper = new ObjectMapper(new YAMLFactory())
+    mapper.registerModule(DefaultScalaModule)
+
+    def load(pattern: String = "classpath*:meta/*.yaml"): Metadata = {
+        val patternResolver = new PathMatchingResourcePatternResolver()
+        val mappingLocations = patternResolver.getResources(pattern)
+        val units = mappingLocations.map(loc => {
+            logger.info("Reading metadata " + loc)
+            mapper.readValue(loc.getInputStream, classOf[Unit])
+        });
+        new Metadata(units)
+    }
+
+    def parse(yaml: String): Unit = {
+        val meta: Unit = mapper.readValue(yaml, classOf[Unit])
+        meta
+    }
+
+    def dump(meta: Unit) = {
+        println(" input = " + meta.input_topic)
+        println("output = " + meta.output_topic)
+        meta.xr_telemetry.map(m => {
+            println("  path = " + m.path)
+            println("keys:")
+            m.keys.getOrElse(Array[Item]()).map(item => {
+                println("    " + item.name + " -> " + item.display_name)
+            });
+            println("content:")
+            m.content.map(item => {
+                println("    " + item.name + " -> " + item.display_name)
+            });
+        });
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala
new file mode 100644 (file)
index 0000000..4019262
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.ves.telemetry
+
+import com.cisco.ztt.meta.Telemetry
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JValue
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.Item
+import org.json4s.JsonAST.JObject
+
+
+class VesMapper(config: Telemetry, namespace: String) extends Serializable {
+
+    object Holder extends Serializable {
+        @transient lazy val logger = Logger.getLogger(getClass.getName)
+    }
+
+    def transform(map: Map[String,Any], host: String, timestamp: String): Set[TimeseriesDatapoint] = {
+
+        val keys = config.keys.getOrElse(Array[Item]()).map( k => {
+            val value = map.get(k.name).get.toString()
+            k.name -> value
+        }).toMap + ("host" -> host)
+
+        val items = config.content.map( wanted => {
+            val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else wanted.name)
+            val value = map.get(wanted.name).get.toString()
+
+            new TimeseriesDatapoint(name, value, timestamp, keys)
+        })
+
+        items.toSet
+    }
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala
new file mode 100644 (file)
index 0000000..5eaf8f3
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.ves.telemetry
+
+import com.cisco.ztt.Transformer
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.meta.Unit
+import com.cisco.ztt.Payload
+import org.apache.log4j.Logger
+import org.json4s.jackson.JsonMethods
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonAST.JObject
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JArray
+
+
+class VesTransformer(unit: Unit) extends Serializable with Transformer {
+
+    object Holder extends Serializable {
+        @transient lazy val logger = Logger.getLogger(getClass.getName)
+    }
+
+    def inputTopic: String = { unit.input_topic }
+
+    val mappers = unit.ves_telemetry.map(d => {
+        d.path.split("/") -> new VesMapper(d, unit.timeseries_namespace)
+    }) //.toMap
+
+    def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = {
+        val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src }
+        val parsed = JsonMethods.parse(rawEvent.getRawdata)
+
+        if (! parsed.isInstanceOf[JObject]) {
+            Holder.logger.warn("Cannot process parsed JSON")
+            return (false, Set[Payload]())
+        }
+        val value = parsed.asInstanceOf[JObject].values.get("event").get.asInstanceOf[Map[String, JValue]]
+        val header = value.get("commonEventHeader").get.asInstanceOf[Map[String,Any]]
+        val host = header.get("reportingEntityName").get.toString
+        val timestamp = header.get("lastEpochMicrosec").get.toString.dropRight(3)
+
+        val generated = mappers.flatMap(r => {
+            val path = r._1
+            val mapper = r._2
+
+            val datapoints = visit(path, value, mapper, host, timestamp)
+            datapoints.map(d => {
+                new Payload(source, unit.output_topic, rawEvent.getHostIp, rawEvent.getTimestamp, d)
+            })
+        }).toSet
+        (true, generated)
+    }
+
+    def visit(
+            path: Array[String],
+            map: Map[String, Any],
+            mapper: VesMapper,
+            host: String,
+            timestamp: String): Set[TimeseriesDatapoint] = {
+        if (path.length > 0) {
+            val option = map.get(path.head)
+            option match {
+                case None => {
+                    Holder.logger.warn("VES mapper failed to dereference JSON " + path.head)
+                    return Set[TimeseriesDatapoint]()
+                }
+
+                case _ => {
+                    option.get match {
+                        case l: List[_] => {
+                            val list = l.asInstanceOf[List[Map[String, Any]]]
+                            return list.flatMap(sub => {
+                                visit(path.tail, sub, mapper, host, timestamp)
+                            }).toSet
+                        }
+                        case m: Map[_, _] => {
+                            val sub = m.asInstanceOf[Map[String, Any]]
+                            return visit(path.tail, sub, mapper, host, timestamp)
+                        }
+
+                    }
+                }
+            }
+        } else {
+            val datapoints = mapper.transform(map, host, timestamp)
+            return datapoints
+        }
+
+        Set[TimeseriesDatapoint]()
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala
new file mode 100644 (file)
index 0000000..57339f8
--- /dev/null
@@ -0,0 +1,50 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used for writing [[DStream]]s to Kafka
+ * @param dStream [[DStream]] to be written to Kafka
+ */
+class DStreamKafkaWriter[T: ClassTag](@transient private val dStream: DStream[T])
+    extends KafkaWriter[T] with Serializable {
+  /**
+   * Write a [[DStream]] to Kafka
+   * @param producerConfig producer configuration for creating KafkaProducer
+   * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+   * @param callback an optional [[Callback]] to be called after each write, default value is None.
+   */
+  override def writeToKafka[K, V](
+    producerConfig: Map[String, Object],
+    transformFunc: T => ProducerRecord[K, V],
+    callback: Option[Callback] = None
+  ): Unit =
+    dStream.foreachRDD { rdd =>
+      val rddWriter = new RDDKafkaWriter[T](rdd)
+      rddWriter.writeToKafka(producerConfig, transformFunc, callback)
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala
new file mode 100644 (file)
index 0000000..2fbf6bc
--- /dev/null
@@ -0,0 +1,71 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import java.util.concurrent.{Callable, ExecutionException, TimeUnit}
+
+import com.google.common.cache._
+import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
+import org.apache.kafka.clients.producer.KafkaProducer
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+/** Cache of [[KafkaProducer]]s */
+object KafkaProducerCache {
+  private type ProducerConf = Seq[(String, Object)]
+  private type ExProducer = KafkaProducer[_, _]
+
+  private val removalListener = new RemovalListener[ProducerConf, ExProducer]() {
+    override def onRemoval(notif: RemovalNotification[ProducerConf, ExProducer]): Unit =
+      notif.getValue.close()
+  }
+
+  private val cacheExpireTimeout = 10.minutes.toMillis
+  private val cache = CacheBuilder.newBuilder()
+    .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
+    .removalListener(removalListener)
+    .build[ProducerConf, ExProducer]()
+
+  /**
+   * Retrieve a [[KafkaProducer]] in the cache or create a new one
+   * @param producerConfig producer configuration for creating [[KafkaProducer]]
+   * @return a [[KafkaProducer]] already in the cache or a new one
+   */
+  def getProducer[K, V](producerConfig: Map[String, Object]): KafkaProducer[K, V] =
+    try {
+      cache.get(mapToSeq(producerConfig), new Callable[KafkaProducer[K, V]] {
+        override def call(): KafkaProducer[K, V] = new KafkaProducer[K, V](producerConfig.asJava)
+      }).asInstanceOf[KafkaProducer[K, V]]
+    } catch {
+      case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError)
+        if e.getCause != null => throw e.getCause
+    }
+
+  /**
+   * Flush and close the [[KafkaProducer]] in the cache associated with this config
+   * @param producerConfig producer configuration associated to a [[KafkaProducer]]
+   */
+  def close(producerConfig: Map[String, Object]): Unit = cache.invalidate(mapToSeq(producerConfig))
+
+  private def mapToSeq(m: Map[String, Object]): Seq[(String, Object)] = m.toSeq.sortBy(_._1)
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala
new file mode 100644 (file)
index 0000000..90d2bb1
--- /dev/null
@@ -0,0 +1,103 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used to write DStreams, RDDs and Datasets to Kafka
+ *
+ * Example usage:
+ * {{{
+ *   import com.github.benfradet.spark.kafka.writer.KafkaWriter._
+ *   import org.apache.kafka.common.serialization.StringSerializer
+ *
+ *   val topic = "my-topic"
+ *   val producerConfig = Map(
+ *     "bootstrap.servers" -> "127.0.0.1:9092",
+ *     "key.serializer" -> classOf[StringSerializer].getName,
+ *     "value.serializer" -> classOf[StringSerializer].getName
+ *   )
+ *
+ *   val dStream: DStream[String] = ...
+ *   dStream.writeToKafka(
+ *     producerConfig,
+ *     s => new ProducerRecord[String, String](topic, s)
+ *   )
+ *
+ *   val rdd: RDD[String] = ...
+ *   rdd.writeToKafka(
+ *     producerConfig,
+ *     s => new ProducerRecord[String, String](localTopic, s)
+ *   )
+ *
+ *   val dataset: Dataset[Foo] = ...
+ *   dataset.writeToKafka(
+ *     producerConfig,
+ *     f => new ProducerRecord[String, String](localTopic, f.toString)
+ *   )
+ *
+ *   val dataFrame: DataFrame = ...
+ *   dataFrame.writeToKafka(
+ *     producerConfig,
+ *     r => new ProducerRecord[String, String](localTopic, r.get(0).toString)
+ *   )
+ * }}}
+ * It is also possible to provide a callback for each write to Kafka.
+ *
+ * This is optional and has a value of None by default.
+ *
+ * Example Usage:
+ * {{{
+ *   @transient val log = org.apache.log4j.Logger.getLogger("spark-kafka-writer")
+ *
+ *   val dStream: DStream[String] = ...
+ *   dStream.writeToKafka(
+ *     producerConfig,
+ *     s => new ProducerRecord[String, String](topic, s),
+ *     Some(new Callback with Serializable {
+ *       override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
+ *         if (Option(e).isDefined) {
+ *           log.warn("error sending message", e)
+ *         } else {
+ *           log.info(s"write succeeded, offset: ${metadata.offset()")
+ *         }
+ *       }
+ *     })
+ *   )
+ * }}}
+ */
+abstract class KafkaWriter[T: ClassTag] extends Serializable {
+  /**
+   * Write a DStream, RDD, or Dataset to Kafka
+   * @param producerConfig producer configuration for creating KafkaProducer
+   * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+   * @param callback an optional [[Callback]] to be called after each write, default value is None.
+   */
+  def writeToKafka[K, V](
+    producerConfig: Map[String, Object],
+    transformFunc: T => ProducerRecord[K, V],
+    callback: Option[Callback] = None
+  ): Unit
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala
new file mode 100644 (file)
index 0000000..c6f6133
--- /dev/null
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used for writing [[RDD]]s to Kafka
+ * @param rdd [[RDD]] to be written to Kafka
+ */
+class RDDKafkaWriter[T: ClassTag](@transient private val rdd: RDD[T])
+    extends KafkaWriter[T] with Serializable {
+  /**
+   * Write a [[RDD]] to Kafka
+   * @param producerConfig producer configuration for creating KafkaProducer
+   * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+   * @param callback an optional [[Callback]] to be called after each write, default value is None.
+   */
+  override def writeToKafka[K, V](
+    producerConfig: Map[String, Object],
+    transformFunc: T => ProducerRecord[K, V],
+    callback: Option[Callback] = None
+  ): Unit =
+    rdd.foreachPartition { partition =>
+      val producer = KafkaProducerCache.getProducer[K, V](producerConfig)
+      partition
+        .map(transformFunc)
+        .foreach(record => producer.send(record, callback.orNull))
+    }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala
new file mode 100644 (file)
index 0000000..04f5ba7
--- /dev/null
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/** Implicit conversions between
+ *  - [[DStream]] -> [[KafkaWriter]]
+ *  - [[RDD]] -> [[KafkaWriter]]
+ *  - [[Dataset]] -> [[KafkaWriter]]
+ *  - [[DataFrame]] -> [[KafkaWriter]]
+ */
+package object writer {
+  /**
+   * Convert a [[DStream]] to a [[KafkaWriter]] implicitly
+   * @param dStream [[DStream]] to be converted
+   * @return [[KafkaWriter]] ready to write messages to Kafka
+   */
+  implicit def dStreamToKafkaWriter[T: ClassTag, K, V](dStream: DStream[T]): KafkaWriter[T] =
+    new DStreamKafkaWriter[T](dStream)
+
+  /**
+   * Convert a [[RDD]] to a [[KafkaWriter]] implicitly
+   * @param rdd [[RDD]] to be converted
+   * @return [[KafkaWriter]] ready to write messages to Kafka
+   */
+  implicit def rddToKafkaWriter[T: ClassTag, K, V](rdd: RDD[T]): KafkaWriter[T] =
+    new RDDKafkaWriter[T](rdd)
+
+}
diff --git a/pnda-ztt-app/src/test/resources/application.properties b/pnda-ztt-app/src/test/resources/application.properties
new file mode 100644 (file)
index 0000000..c99a8ad
--- /dev/null
@@ -0,0 +1,8 @@
+app.job_name=collectd.tsdb
+kafka.brokers=localhost:9092
+kafka.zookeeper=localhost:2181
+app.checkpoint_path=/tmp
+app.processing_parallelism=1
+app.batch_size_seconds=5
+kafka.consume_from_beginning=false
+opentsdb.ip=localhost:4242
diff --git a/pnda-ztt-app/src/test/resources/log4j.testing.properties b/pnda-ztt-app/src/test/resources/log4j.testing.properties
new file mode 100644 (file)
index 0000000..c3b266c
--- /dev/null
@@ -0,0 +1,14 @@
+log4j.rootCategory=WARN,console
+log4j.logger.org=WARN
+log4j.logger.com.cisco.pnda=DEBUG,console
+log4j.additivity.com.cisco.pnda=false
+log4j.logger.com.cisco.ztt=DEBUG,console
+log4j.additivity.com.cisco.ztt=false
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.immediateFlush=true
+log4j.appender.console.encoding=UTF-8
+
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.conversionPattern=%d %-5p %c - %m%n
diff --git a/pnda-ztt-app/src/test/resources/meta/test-one.yaml b/pnda-ztt-app/src/test/resources/meta/test-one.yaml
new file mode 100644 (file)
index 0000000..9900043
--- /dev/null
@@ -0,0 +1,14 @@
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+    keys:
+      - name: interface-name
+        display_name: "Interface Name"
+    content:
+      - name: bytes-received
+        display_name: "Bytes Received"
+
+      - name: bytes-sent
+        display_name: "Bytes Sent"
diff --git a/pnda-ztt-app/src/test/resources/meta/test-three.yaml b/pnda-ztt-app/src/test/resources/meta/test-three.yaml
new file mode 100644 (file)
index 0000000..9900043
--- /dev/null
@@ -0,0 +1,14 @@
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+    keys:
+      - name: interface-name
+        display_name: "Interface Name"
+    content:
+      - name: bytes-received
+        display_name: "Bytes Received"
+
+      - name: bytes-sent
+        display_name: "Bytes Sent"
diff --git a/pnda-ztt-app/src/test/resources/meta/test-two.yaml b/pnda-ztt-app/src/test/resources/meta/test-two.yaml
new file mode 100644 (file)
index 0000000..9900043
--- /dev/null
@@ -0,0 +1,14 @@
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+    keys:
+      - name: interface-name
+        display_name: "Interface Name"
+    content:
+      - name: bytes-received
+        display_name: "Bytes Received"
+
+      - name: bytes-sent
+        display_name: "Bytes Sent"
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala
new file mode 100644 (file)
index 0000000..c08ae2f
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.telemetry.xr
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.json4s.JsonAST.JArray
+import com.cisco.ztt.cisco.xr.telemetry.JsonParser
+
+class JsonParserTest extends FlatSpec with Matchers {
+  val cpu_json = """
+{
+    "Source": "172.16.1.157:27059",
+    "Telemetry": {
+        "node_id_str": "IOS-XRv9k-edge-1",
+        "subscription_id_str": "cpu",
+        "encoding_path": "Cisco-IOS-XR-wdsysmon-fd-oper:system-monitoring/cpu-utilization",
+        "collection_id": 265673,
+        "collection_start_time": 1505905434090,
+        "msg_timestamp": 1505905434090,
+        "collection_end_time": 1505905434103
+    },
+    "Rows": [
+        {
+            "Timestamp": 1505905434099,
+            "Keys": {
+                "node-name": "0/RP0/CPU0"
+            },
+            "Content": {
+                "process-cpu_PIPELINE_EDIT": [
+                    {
+                        "process-cpu-fifteen-minute": 0,
+                        "process-cpu-five-minute": 0,
+                        "process-cpu-one-minute": 0,
+                        "process-id": 1,
+                        "process-name": "init"
+                    },
+                    {
+                        "process-cpu-fifteen-minute": 0,
+                        "process-cpu-five-minute": 0,
+                        "process-cpu-one-minute": 0,
+                        "process-id": 1544,
+                        "process-name": "bash"
+                    },
+                    {
+                        "process-cpu-fifteen-minute": 0,
+                        "process-cpu-five-minute": 0,
+                        "process-cpu-one-minute": 0,
+                        "process-id": 26436,
+                        "process-name": "sleep"
+                    }
+                ],
+                "total-cpu-fifteen-minute": 6,
+                "total-cpu-five-minute": 6,
+                "total-cpu-one-minute": 6
+            }
+        }
+    ]
+}
+    """
+
+    "JsonParser" should "successfully parse cpu telemetry JSON" in {
+        val event = JsonParser.parse(cpu_json)
+
+        event.Telemetry.subscription_id_str should be ("cpu")
+        event.Rows(0).Keys.size should be (1)
+
+        val subrows = event.Rows(0).Content("process-cpu_PIPELINE_EDIT")
+        val extracted = JsonParser.array(subrows)
+
+        extracted.size should be (3)
+        extracted(0).size should be (5)
+    }
+
+  val null_keys = """
+{
+  "Source": "172.16.1.157:49227",
+  "Telemetry": {
+    "node_id_str": "IOS-XRv9k-edge-1",
+    "subscription_id_str": "Logging",
+    "encoding_path": "Cisco-IOS-XR-infra-syslog-oper:syslog/logging-statistics",
+    "collection_id": 925712,
+    "collection_start_time": 1507552918199,
+    "msg_timestamp": 1507552918199,
+    "collection_end_time": 1507552918203
+  },
+  "Rows": [
+    {
+      "Timestamp": 1507552918201,
+      "Keys": null,
+      "Content": {
+        "buffer-logging-stats": {
+          "buffer-size": 2097152,
+          "is-log-enabled": "true",
+          "message-count": 221,
+          "severity": "message-severity-debug"
+        }
+      }
+    }
+  ]
+}
+  """
+
+  it should "successfully parse JSON with null keys" in {
+      val event = JsonParser.parse(null_keys)
+
+      event.Rows(0).Keys.size should be (0)
+  }
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala
new file mode 100644 (file)
index 0000000..b133c9e
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.telemetry.xr
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import com.cisco.ztt.cisco.xr.telemetry.TimeseriesMapper
+import com.cisco.ztt.meta.YamlReader
+import com.cisco.ztt.cisco.xr.telemetry.JsonParser
+
+class TelemetryMapperTest extends FlatSpec with Matchers {
+
+    val ipv6_yaml = """
+input_topic: telemetry.avro
+output_topic: timeseries
+timeseries_namespace: ipv6
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic
+    keys:
+      - name: node-name
+        display_name: "Node Name"
+    content:
+      - name: ipv6.total-packets
+        display_name: "Total IPV6 Packets"
+        ts_name: total-ipv6-packets
+
+      - name: icmp.total-messages
+        display_name: "Total ICMP Messages"
+"""
+
+    val ipv6_json = """
+{
+  "Source": "172.16.1.157:56197",
+  "Telemetry": {
+    "node_id_str": "IOS-XRv9k-edge-1",
+    "subscription_id_str": "ipv6-io",
+    "encoding_path": "Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic",
+    "collection_id": 282962,
+    "collection_start_time": 1506008887021,
+    "msg_timestamp": 1506008887021,
+    "collection_end_time": 1506008887039
+  },
+  "Rows": [
+    {
+      "Timestamp": 1506008887031,
+      "Keys": {
+        "node-name": "0/RP0/CPU0"
+      },
+      "Content": {
+        "icmp": {
+          "output-messages": 0,
+          "total-messages": 4,
+          "unknown-error-type-messages": 0
+        },
+        "ipv6": {
+          "bad-header-packets": 0,
+          "total-packets": 12,
+          "unknown-protocol-packets": 0
+        },
+        "ipv6-node-discovery": {
+          "received-redirect-messages": 0,
+          "sent-redirect-messages": 0
+        }
+      }
+    }
+  ]
+}
+"""
+
+    "TelemetryMapper" should "Map to wanted timeseries values" in {
+        val meta = YamlReader.parse(ipv6_yaml)
+        val mapper = new TimeseriesMapper(meta.xr_telemetry(0), "demo")
+
+        val event = JsonParser.parse(ipv6_json)
+        val timeseries = mapper.transform(event).toList
+
+        timeseries.length should be (2)
+        timeseries(0).metric should be ("demo.icmp.total-messages")
+        timeseries(1).metric should be ("demo.total-ipv6-packets")
+    }
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala
new file mode 100644 (file)
index 0000000..2ee1749
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+import org.scalatest.Matchers
+import org.scalatest.FlatSpec
+import com.cisco.ztt.TimeseriesDatapoint
+import java.io.StringWriter
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+class DatapointTest extends FlatSpec with Matchers {
+    
+    "TimeseriesDatapoint" should "serialize to json" in {
+        val data = Array(
+                new TimeseriesDatapoint("packets-in", "5", "1000000",
+                        Map("host" -> "host1", "inteface" -> "GigabitEthernet0/0/0/1")),
+                new TimeseriesDatapoint("packets-out", "5", "1000000",
+                        Map("host" -> "host1", "inteface" -> "GigabitEthernet0/0/0/1"))
+        )
+        
+        val mapper = new ObjectMapper()
+        mapper.registerModule(DefaultScalaModule)
+
+        val out = new StringWriter
+        mapper.writeValue(out, data)
+        val json = out.toString()
+    }
+  
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala
new file mode 100644 (file)
index 0000000..eabe54e
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+class YamlReaderTest extends FlatSpec with Matchers {
+
+  val interface_yaml = """
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+  - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+    keys:
+      - name: interface-name
+        display_name: "Interface Name"
+
+    content:
+      - name: bytes-received
+        display_name: "Bytes Received"
+
+      - name: bytes-sent
+        display_name: "Bytes Sent"
+
+      - name: packets-received
+        display_name: "Packets Received"
+
+      - name: packets-sent
+        display_name: "Packets Sent"
+"""
+
+  "YamlReader" should "successfully parse YAML text" in {
+    val meta = YamlReader.parse(interface_yaml)
+
+    meta.input_topic should be ("telemetry")
+    meta.output_topic should be ("timeseries")
+    meta.xr_telemetry.length should be (1)
+    meta.xr_telemetry(0).keys.get.length should be(1)
+    meta.xr_telemetry(0).content.length should be(4)
+  }
+
+  it should "read multiple files from classpath" in {
+    val meta = YamlReader.load("classpath*:meta/test-*.yaml")
+    meta.units.length should be (3)
+  }
+}
diff --git a/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala b/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala
new file mode 100644 (file)
index 0000000..58f87fe
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ves.telemetry
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.meta.YamlReader
+import com.cisco.ztt.ves.telemetry.VesTransformer
+import org.apache.log4j.BasicConfigurator
+
+class VesTransformerTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+    before {
+        BasicConfigurator.configure();
+    }
+
+    val yaml = """
+format: ves
+input_topic: ves.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: nic
+
+ves_telemetry:
+  - path: measurementsForVfScalingFields/vNicUsageArray
+    keys:
+      - name: vNicIdentifier
+    content:
+      - name: receivedTotalPacketsDelta
+"""
+    val payload = """{
+    "event": {
+        "commonEventHeader": {
+            "startEpochMicrosec": 1537258053361276,
+            "sourceId": "bab0c4de-cfb8-4a0d-b7e4-a3d4020bb667",
+            "eventId": "TrafficStats_1.2.3.4",
+            "nfcNamingCode": "vVNF",
+            "reportingEntityId": "No UUID available",
+            "internalHeaderFields": {
+                "collectorTimeStamp": "Tue, 09 18 2018 08:01:41 GMT"
+            },
+            "eventType": "HTTP request rate",
+            "priority": "Normal",
+            "version": 3,
+            "reportingEntityName": "fwll",
+            "sequence": 6108,
+            "domain": "measurementsForVfScaling",
+            "lastEpochMicrosec": 1537258063557408,
+            "eventName": "vFirewallBroadcastPackets",
+            "sourceName": "vFW_SINK_VNF2",
+            "nfNamingCode": "vVNF"
+        },
+        "measurementsForVfScalingFields": {
+            "cpuUsageArray": [
+                {
+                    "percentUsage": 0,
+                    "cpuIdentifier": "cpu1",
+                    "cpuIdle": 100,
+                    "cpuUsageSystem": 0,
+                    "cpuUsageUser": 0
+                }
+            ],
+            "measurementInterval": 10,
+            "requestRate": 9959,
+            "vNicUsageArray": [
+                {
+                    "transmittedOctetsDelta": 0,
+                    "receivedTotalPacketsDelta": 0,
+                    "vNicIdentifier": "eth0",
+                    "valuesAreSuspect": "true",
+                    "transmittedTotalPacketsDelta": 0,
+                    "receivedOctetsDelta": 0
+                }
+            ],
+            "measurementsForVfScalingVersion": 2.1
+        }
+    }
+}
+"""
+    "VesTransformer" should "successfully transform a VES event" in {
+        val unit = YamlReader.parse(yaml)
+        val ves = new VesTransformer(unit)
+
+        val event = new DataPlatformEvent("src", System.currentTimeMillis(), "127.0.0.1", payload)
+        val (status, result) = ves.transform(event)
+
+        status should be(true)
+        result.toList.length should be (1)
+        result.toList(0).datapoint.metric should be ("nic.receivedTotalPacketsDelta")
+    }
+}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore
new file mode 100644 (file)
index 0000000..d392f0e
--- /dev/null
@@ -0,0 +1 @@
+*.jar
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties
new file mode 100644 (file)
index 0000000..8c1b965
--- /dev/null
@@ -0,0 +1,8 @@
+kafka.brokers=${environment_kafka_brokers}
+kafka.zookeeper=${environment_kafka_zookeeper}
+app.checkpoint_path=${component_checkpoint_path}
+app.job_name=${component_job_name}
+app.processing_parallelism=${component_processing_parallelism}
+app.batch_size_seconds=${component_batch_size_seconds}
+kafka.consume_from_beginning=${component_consume_from_beginning}
+opentsdb.ip=${environment_opentsdb}
\ No newline at end of file
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties
new file mode 100644 (file)
index 0000000..6a3cab1
--- /dev/null
@@ -0,0 +1,13 @@
+log4j.rootLogger=ERROR,rolling
+log4j.logger.com.cisco.pnda=${component.log_level},rolling
+log4j.additivity.com.cisco.pnda=false
+log4j.logger.com.cisco.ztt=DEBUG,rolling
+log4j.additivity.com.cisco.ztt=false
+log4j.appender.rolling=org.apache.log4j.RollingFileAppender
+log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
+log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
+log4j.appender.rolling.maxFileSize=50MB
+log4j.appender.rolling.maxBackupIndex=1
+log4j.appender.rolling.file=${spark.yarn.app.container.log.dir}/spark.log
+log4j.appender.rolling.encoding=UTF-8
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json
new file mode 100644 (file)
index 0000000..1029987
--- /dev/null
@@ -0,0 +1,8 @@
+[
+    {
+        "name": "nic.receivedTotalPacketsDelta"
+    },
+    {
+        "name": "cpu.percentUsage"
+    }
+]
\ No newline at end of file
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json
new file mode 100644 (file)
index 0000000..2f8ab6a
--- /dev/null
@@ -0,0 +1,10 @@
+{
+    "main_jar": "PndaZTTApp.jar",
+    "main_class": "com.cisco.ztt.App",
+    "log_level": "INFO",
+    "batch_size_seconds" : "2",
+    "processing_parallelism" : "1",
+    "checkpoint_path": "",
+    "input_topic": "ves.avro",
+    "consume_from_beginning": "false"
+}
diff --git a/pnda-ztt-app/test/application.properties b/pnda-ztt-app/test/application.properties
new file mode 100644 (file)
index 0000000..e7a2923
--- /dev/null
@@ -0,0 +1,8 @@
+kafka.brokers=192.168.10.5:9092
+kafka.zookeeper=192.168.10.5:2181
+app.checkpoint_path=
+app.job_name=Zomg
+app.processing_parallelism=1
+app.batch_size_seconds=2
+kafka.consume_from_beginning=false
+opentsdb.ip=
diff --git a/pnda-ztt-app/test/log4j.testing.properties b/pnda-ztt-app/test/log4j.testing.properties
new file mode 100644 (file)
index 0000000..1388b20
--- /dev/null
@@ -0,0 +1,14 @@
+log4j.rootCategory=TRACE,console
+log4j.logger.org=TRACE
+log4j.logger.com.cisco.pnda=TRACE,console
+log4j.additivity.com.cisco.pnda=false
+log4j.logger.com.cisco.ztt=TRACE,console
+log4j.additivity.com.cisco.ztt=false
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.immediateFlush=true
+log4j.appender.console.encoding=UTF-8
+
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.conversionPattern=%d %-5p %c - %m%n
diff --git a/pom.xml b/pom.xml
new file mode 100644 (file)
index 0000000..6b193d5
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<!--
+================================================================================
+
+dcae-analytics-pnda
+
+================================================================================
+Copyright (c) 2017 Cisco Intellectual Property. All rights reserved.
+================================================================================
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+============LICENSE_END=========================================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0   http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.onap.oparent</groupId>
+    <artifactId>oparent</artifactId>
+    <version>1.2.1</version>
+    <relativePath/>
+  </parent>
+
+  <groupId>org.onap.dcaegen2.analytics.pnda</groupId>
+  <artifactId>pnda-utils</artifactId>
+  <version>1.0.0-SNAPSHOT</version>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>pnda-ztt-app</module>
+  </modules>
+
+</project>