GitHub - bloomberg/spark-flow: Library for organizing batch processing pipelines...
source link: https://github.com/bloomberg/spark-flow
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
spark-flow
This is a library for organizing batch processing pipelines in Apache Spark and handling automatic checkpointing of intermediate results. The core type is a DC (Distributed Collection) which is analogous to a Spark Dataset. DCs have an API encompassing RDDs, Datasets, and Dataframes.
A logical pipeline can be constructed entirely lazily without a Spark context. Pass in a Spark context to any DC to get the corresponding Dataset, RDD or Dataframe.
Building Locally
sbt publishLocal
Include in project
libraryDependencies += "com.bloomberg" %% "spark-flow" % "0.1.0"
Basic Example
import com.bloomberg.sparkflow
val numbers = sparkflow.parallelize(1 to 10)
val filtered = numbers.filter(_ < 3).checkpoint()
val doubled = filtered.map(_ * 2)
println(doubled.getRDD(sc).sum())
Combined Dataframe API
import com.bloomberg.sparkflow
val cars = sparkflow.read
.format("csv")
.option("header", "true")
.load(testFile("cars.csv"))
val makeModel = cars.select("make", "model").checkpoint()
makeModel.getDF(sc).show()
Larger Example
object FilmsPipeline {
class FilmMain(){
val inputFilmRows = sparkflow.read.format("csv").option("header", "true")
.load(testFile("Film_Locations_in_San_Francisco.csv"))
.toDF(FilmsPipeline.columns:_*)
.as[FilmsPipeline.InputFilmRow]
val filmRows = inputFilmRows.keyBy(_.title)
.groupByKey()
.map(parseFilmRows)
.checkpoint()
val actorMovieCount = filmRows.flatMap(filmRow => filmRow.actors.map((_,1))).reduceByKey(_+_)
val topActors = actorMovieCount.sortBy(_._2, ascending = false).map(_._1).take(5)
val filmsWithTopActors = filmRows.withResult(topActors).filter{
filmRowActors => {
val (filmRow, actors) = filmRowActors
filmRow.actors.toSet.intersect(actors.toSet).nonEmpty
}
}.map(_._1)
}
val columns = Seq(
"title",
"release",
"locations",
"funFacts",
"productionCompany",
"distributor",
"director",
"writer",
"actor1",
"actor2",
"actor3"
)
case class InputFilmRow(title: String,
release: String,
locations: String,
funFacts: String,
productionCompany: String,
distributor: String,
director: String,
writer: String,
actor1: String,
actor2: String,
actor3: String)
case class FilmRow( title: String,
release: String,
locations: Seq[String],
funFacts: String,
productionCompany: String,
distributor: String,
director: String,
writer: String,
actors: Seq[String])
val parseFilmRows = (tuple: (String, Seq[InputFilmRow])) => {
val (title, rows) = tuple
val firstRow = rows.head
val locations = rows.map(_.locations).distinct
val actors = rows.flatMap(row => Seq(row.actor1, row.actor2, row.actor3)).distinct.filter(_ != "")
FilmRow(
firstRow.title,
firstRow.release,
locations,
firstRow.funFacts,
firstRow.productionCompany,
firstRow.distributor,
firstRow.director,
firstRow.writer,
actors)
}
def testFile(fileName: String): String = {
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
}
}
Upcoming
- graphx support
- DAG viewer frontend attached to running process
- component / pipeline abstractions
- debug run mode with auto Try wrapped functions and trapped failures
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK