-SERVER=https://knox.example.com:8443/gateway/pnda/deployment
+SERVER=https://knox.example.com:8443/gateway/pnda/repository
APP=src/universal/sparkStreaming/PndaZTTApp/PndaZTTApp.jar
-PACKAGE=pnda-ztt-app-0.0.3.tar.gz
+PACKAGE=pnda-ztt-app-0.0.4.tar.gz
+DEBUG=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"
app: ## Build the application jar
sbt assembly
sbt 'universal:packageZipTarball'
deploy: ## Deploy the package to PNDA cluster
- curl -k -u pnda:pnda -X PUT $(SERVER)/packages/$(PACKAGE) --data-binary @target/universal/$(PACKAGE) > /dev/null
+ curl -k -u pnda:pnda -X PUT $(SERVER)/packages/ --upload-file target/universal/$(PACKAGE) > /dev/null
list: ## Show the deployed packages
curl $(SERVER)/packages
delete: ## Delete the deployed package
- curl -XDELETE $(SERVER)/packages/$(PACKAGE)
+ curl -k -u pnda:pnda -XDELETE $(SERVER)/packages/$(PACKAGE)
test/PndaZTTApp.jar: $(APP) test/application.properties
cp $< $@
name := "pnda-ztt-app"
-version := "0.0.3"
+version := "0.0.4"
-scalaVersion := "2.10.6"
+scalaVersion := "2.11.8"
enablePlugins(UniversalPlugin)
}
libraryDependencies ++= Seq(
- "org.apache.spark" %% "spark-core" % "1.6.0" % "provided",
- "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided",
- "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0" ,
- "org.apache.kafka" %% "kafka" % "0.8.2.1",
+ "org.apache.spark" %% "spark-core" % "2.3.0" % "provided",
+ "org.apache.spark" %% "spark-streaming" % "2.3.0" % "provided",
+ "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.3" ,
+ "org.apache.kafka" %% "kafka" % "0.11.0.3",
"org.apache.avro" % "avro" % "1.7.7",
"joda-time" % "joda-time" % "2.8.1" % "provided",
"log4j" % "log4j" % "1.2.17" % "provided",
"org.apache.httpcomponents" % "httpcore" % "4.2.5" % "provided",
"org.apache.httpcomponents" % "httpclient" % "4.2.5" % "provided",
- "com.fasterxml.jackson.core" % "jackson-databind" % "2.2.3" % "provided",
- "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.2.3",
- "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.2.3" % "provided",
+ "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7" % "provided",
+ "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7" % "provided",
+ "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.6.7",
+ "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.6.7.1" % "provided",
"org.springframework" % "spring-core" % "4.3.10.RELEASE",
- "org.scalatest" % "scalatest_2.10" % "3.0.1" % "test"
+ "org.scalatest" % "scalatest_2.11" % "3.0.5" % "test",
+ "org.scala-lang" % "scala-reflect" % "2.11.8",
+ "org.scala-lang" % "scala-compiler" % "2.11.8" % "provided",
+ "org.scalactic" % "scalactic_2.11" % "3.2.0-SNAP10"
)
EclipseKeys.withSource := true
<configuration>
<artifacts>
<artifact>
- <file>${project.build.directory}/universal/${project.name}-0.0.3.tar.gz</file>
+ <file>${project.build.directory}/universal/${project.name}-0.0.4.tar.gz</file>
<type>tar.gz</type>
<classifier>app</classifier>
</artifact>
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.1")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.1.1")
+addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2")
package com.cisco.ztt
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.streaming.kafka010.KafkaUtils
import com.cisco.pnda.model.DataPlatformEventCodec
import com.cisco.pnda.model.StaticHelpers
import kafka.serializer.DefaultDecoder
import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
+import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
+import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
+import org.apache.spark.streaming.kafka010.PreferConsistent
+import java.util.Arrays
+import scala.collection.JavaConversions
+import java.util.Collections
+import org.springframework.core.serializer.DefaultDeserializer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
class KafkaInput extends Serializable {
def readFromKafka(ssc: StreamingContext, topic: String) = {
val props = AppConfig.loadProperties();
- val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers"))
+ val kafkaParams = collection.mutable.Map[String, Object](
+ "bootstrap.servers" -> props.getProperty("kafka.brokers"),
+ "key.deserializer" -> classOf[StringDeserializer],
+ "value.deserializer" -> classOf[ByteArrayDeserializer],
+ "group.id" -> "pnda-ztt-app"
+ )
if (props.getProperty("kafka.consume_from_beginning").toBoolean) {
kafkaParams.put("auto.offset.reset", "smallest");
}
- Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list"))
+ Holder.logger.info("Registering with kafka using bootstrap servers " + kafkaParams("bootstrap.servers"))
Holder.logger.info("Registering with kafka using topic " + topic)
- val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
- ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));
+ val messages = KafkaUtils.createDirectStream[String, Array[Byte]](
+ ssc, PreferConsistent,
+ Subscribe[String, Array[Byte]](Arrays.asList(topic), JavaConversions.mapAsJavaMap(kafkaParams)))
+ // .repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")))
// Decode avro container format
val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
val rawMessages = messages.map(x => {
val eventDecoder = new DataPlatformEventCodec(avroSchemaString);
- val payload = x._2;
+ val payload = x.value;
val dataPlatformEvent = eventDecoder.decode(payload);
dataPlatformEvent;
});
}
} catch {
case t: Throwable => {
- Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage)
+ Holder.logger.error("Failed to parse JSON: " + t.getClass().getName() + " : " + t.getLocalizedMessage)
(false, Set[Payload]())
}
}
"processing_parallelism" : "1",
"checkpoint_path": "",
"input_topic": "ves.avro",
- "consume_from_beginning": "false"
+ "consume_from_beginning": "false",
+ "spark_version": "2"
}
log4j.additivity.com.cisco.pnda=false
log4j.logger.com.cisco.ztt=TRACE,console
log4j.additivity.com.cisco.ztt=false
+log4j.logger.org.spark_project=INFO,console
+log4j.additivity.org.spark_project=false
+log4j.logger.org.apache=WARN,console
+log4j.additivity.org.apache=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out