Uprev spark/kafka to resolve vulnerabilities 84/82584/3
authorDonald Hunter <donaldh@cisco.com>
Mon, 18 Mar 2019 11:57:39 +0000 (11:57 +0000)
committerDonald Hunter <donaldh@cisco.com>
Mon, 18 Mar 2019 13:36:23 +0000 (13:36 +0000)
Change-Id: Ibc18238a7fe7886c6da6b6d295de0d32d4620aa7
Issue-ID: DCAEGEN2-1207
Signed-off-by: Donald Hunter <donaldh@cisco.com>
pnda-ztt-app/Makefile
pnda-ztt-app/build.sbt
pnda-ztt-app/pom.xml
pnda-ztt-app/project/plugins.sbt
pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json
pnda-ztt-app/test/log4j.testing.properties

index 7c08bde..69226cc 100644 (file)
@@ -1,6 +1,7 @@
-SERVER=https://knox.example.com:8443/gateway/pnda/deployment
+SERVER=https://knox.example.com:8443/gateway/pnda/repository
 APP=src/universal/sparkStreaming/PndaZTTApp/PndaZTTApp.jar
-PACKAGE=pnda-ztt-app-0.0.3.tar.gz
+PACKAGE=pnda-ztt-app-0.0.4.tar.gz
+DEBUG=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"
 
 app:           ## Build the application jar
        sbt assembly
@@ -9,13 +10,13 @@ package:     ## Build the deployable package
        sbt 'universal:packageZipTarball'
 
 deploy:                ## Deploy the package to PNDA cluster
-       curl -k -u pnda:pnda -X PUT $(SERVER)/packages/$(PACKAGE) --data-binary @target/universal/$(PACKAGE) > /dev/null
+       curl -k -u pnda:pnda -X PUT $(SERVER)/packages/ --upload-file target/universal/$(PACKAGE) > /dev/null
 
 list:          ## Show the deployed packages
        curl $(SERVER)/packages
 
 delete:                ## Delete the deployed package
-       curl -XDELETE $(SERVER)/packages/$(PACKAGE)
+       curl -k -u pnda:pnda -XDELETE $(SERVER)/packages/$(PACKAGE)
 
 test/PndaZTTApp.jar:   $(APP) test/application.properties
        cp $< $@
index 5811d22..24f6bdb 100644 (file)
@@ -1,8 +1,8 @@
 name := "pnda-ztt-app"
 
-version := "0.0.3"
+version := "0.0.4"
 
-scalaVersion := "2.10.6"
+scalaVersion := "2.11.8"
 
 enablePlugins(UniversalPlugin)
 
@@ -15,20 +15,24 @@ packageZipTarball in Universal := {
 }
 
 libraryDependencies ++= Seq(
-    "org.apache.spark" %% "spark-core" % "1.6.0" % "provided",
-    "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided",
-    "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0" ,
-    "org.apache.kafka" %% "kafka" % "0.8.2.1",
+    "org.apache.spark" %% "spark-core" % "2.3.0" % "provided",
+    "org.apache.spark" %% "spark-streaming" % "2.3.0" % "provided",
+    "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.3" ,
+    "org.apache.kafka" %% "kafka" % "0.11.0.3",
     "org.apache.avro" % "avro" % "1.7.7",
     "joda-time" % "joda-time" % "2.8.1" % "provided",
     "log4j" % "log4j" % "1.2.17" % "provided",
     "org.apache.httpcomponents" % "httpcore" % "4.2.5" % "provided",
     "org.apache.httpcomponents" % "httpclient" % "4.2.5" % "provided",
-    "com.fasterxml.jackson.core" % "jackson-databind" % "2.2.3" % "provided",
-    "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.2.3",
-    "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.2.3" % "provided",
+    "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7" % "provided",
+    "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7" % "provided",
+    "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.6.7",
+    "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.6.7.1" % "provided",
     "org.springframework" % "spring-core" % "4.3.10.RELEASE",
-    "org.scalatest" % "scalatest_2.10" % "3.0.1" % "test"
+    "org.scalatest" % "scalatest_2.11" % "3.0.5" % "test",
+    "org.scala-lang" % "scala-reflect" % "2.11.8",
+    "org.scala-lang" % "scala-compiler" % "2.11.8" % "provided",
+    "org.scalactic" % "scalactic_2.11" % "3.2.0-SNAP10"
 )
 
 EclipseKeys.withSource := true
index 68417cd..b5ba320 100644 (file)
@@ -65,7 +65,7 @@
             <configuration>
               <artifacts>
                 <artifact>
-                  <file>${project.build.directory}/universal/${project.name}-0.0.3.tar.gz</file>
+                  <file>${project.build.directory}/universal/${project.name}-0.0.4.tar.gz</file>
                   <type>tar.gz</type>
                   <classifier>app</classifier>
                 </artifact>
index b03dd6c..8db1a6c 100644 (file)
@@ -1,3 +1,4 @@
 addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.1")
 addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")
 addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.1.1")
+addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2")
index c0bc61e..838b155 100644 (file)
@@ -40,7 +40,7 @@ express or implied.
 package com.cisco.ztt
 
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.streaming.kafka010.KafkaUtils
 
 import com.cisco.pnda.model.DataPlatformEventCodec
 import com.cisco.pnda.model.StaticHelpers
@@ -48,6 +48,15 @@ import com.cisco.pnda.model.StaticHelpers
 import kafka.serializer.DefaultDecoder
 import kafka.serializer.StringDecoder
 import org.apache.log4j.Logger
+import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
+import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
+import org.apache.spark.streaming.kafka010.PreferConsistent
+import java.util.Arrays
+import scala.collection.JavaConversions
+import java.util.Collections
+import org.springframework.core.serializer.DefaultDeserializer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 
 class KafkaInput extends Serializable {
 
@@ -57,22 +66,29 @@ class KafkaInput extends Serializable {
 
     def readFromKafka(ssc: StreamingContext, topic: String) = {
         val props = AppConfig.loadProperties();
-        val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers"))
+        val kafkaParams = collection.mutable.Map[String, Object](
+            "bootstrap.servers" -> props.getProperty("kafka.brokers"),
+            "key.deserializer" -> classOf[StringDeserializer],
+            "value.deserializer" -> classOf[ByteArrayDeserializer],
+            "group.id" -> "pnda-ztt-app"
+        )
         if (props.getProperty("kafka.consume_from_beginning").toBoolean) {
             kafkaParams.put("auto.offset.reset", "smallest");
         }
 
-        Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list"))
+        Holder.logger.info("Registering with kafka using bootstrap servers " + kafkaParams("bootstrap.servers"))
         Holder.logger.info("Registering with kafka using topic " + topic)
 
-        val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
-            ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));
+        val messages = KafkaUtils.createDirectStream[String, Array[Byte]](
+            ssc, PreferConsistent,
+            Subscribe[String, Array[Byte]](Arrays.asList(topic), JavaConversions.mapAsJavaMap(kafkaParams)))
+        // .repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")))
 
         // Decode avro container format
         val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
         val rawMessages = messages.map(x => {
             val eventDecoder = new DataPlatformEventCodec(avroSchemaString);
-            val payload = x._2;
+            val payload = x.value;
             val dataPlatformEvent = eventDecoder.decode(payload);
             dataPlatformEvent;
         });
index b7deadc..4fc57f8 100644 (file)
@@ -62,7 +62,7 @@ class TelemetryDispatcher(unit: Unit) extends Serializable with Transformer {
             }
         } catch {
             case t: Throwable => {
-                Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage)
+                Holder.logger.error("Failed to parse JSON: " + t.getClass().getName() + " : " + t.getLocalizedMessage)
                 (false, Set[Payload]())
             }
         }
index 2f8ab6a..2301cb4 100644 (file)
@@ -6,5 +6,6 @@
     "processing_parallelism" : "1",
     "checkpoint_path": "",
     "input_topic": "ves.avro",
-    "consume_from_beginning": "false"
+    "consume_from_beginning": "false",
+    "spark_version": "2"
 }
index 1388b20..de05e30 100644 (file)
@@ -4,6 +4,10 @@ log4j.logger.com.cisco.pnda=TRACE,console
 log4j.additivity.com.cisco.pnda=false
 log4j.logger.com.cisco.ztt=TRACE,console
 log4j.additivity.com.cisco.ztt=false
+log4j.logger.org.spark_project=INFO,console
+log4j.additivity.org.spark_project=false
+log4j.logger.org.apache=WARN,console
+log4j.additivity.org.apache=false
  
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.out