4

Spark读取elasticsearch数据指南 - sherlockyb

 2 years ago
source link: https://www.cnblogs.com/sherlockyb/p/16357073.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Spark读取elasticsearch数据指南

最近要在 Spark job 中通过 Spark SQL 的方式读取 Elasticsearch 数据,踩了一些坑,总结于此。

  • Spark job 的编写语言为 Scala,scala-library 的版本为 2.11.8。

  • Spark 相关依赖包的版本为 2.3.2,如 spark-core、spark-sql。

  • Elasticsearch 数据

    schema

    {
      "settings": {
        "number_of_replicas": 1
      },
      "mappings": {
        "label": {
          "properties": {
            "docId": {
              "type": "keyword"
            },
            "labels": {
              "type": "nested",
              "properties": {
                "id": {
                  "type": "long"
                },
                "label": {
                  "type": "keyword"
                }
              }
            },
            "itemId": {
              "type": "long"
            }
          }
        }
      }
    }
    

    sample data

    {
      "took" : 141,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : 17370929,
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "aen-label-v1",
            "_type" : "label",
            "_id" : "123_ITEM",
            "_score" : 1.0,
            "_source" : {
              "docId" : "123_ITEM",
              "labels" : [
                {
                  "id" : 7378,
                  "label" : "1kg"
                }
              ],
              "itemId" : 123
            }
          },
          {
            "_index" : "aen-label-v1",
            "_type" : "label",
            "_id" : "456_ITEM",
            "_score" : 1.0,
            "_source" : {
              "docId" : "456_ITEM",
              "labels" : [
                {
                  "id" : 7378,
                  "label" : "2kg"
                }
              ],
              "itemId" : 456
            }
          }
        ]
      }
    }
    

既然要用 Spark SQL,当然少不了其对应的依赖,

dependencies {
  implementation 'org.apache.spark:spark-core_2.11:2.3.2'
  implementation 'org.apache.spark:spark-sql_2.11:2.3.2'
}

对于 ES 的相关库,如同 官网 所说,要在 Spark 中访问 ES,需要将 elasticsearch-hadoop 依赖包加入到 Spark job 运行的类路径中,具体而言就是添加到 Spark job 工程的依赖中,公司的 nexus 中当前最新的版本为 7.15.0,且目前我们是使用 gradle 管理依赖,故添加依赖的代码如下,

dependencies {
  implementation 'org.elasticsearch:elasticsearch-hadoop:7.15.0'
}

对于 Spark,基于资源管理器的不同,可以在两种模式下运行:本地模式和集群模式,可通过 --master 参数来指定资源管理器的方式。本地模式时,不依赖额外的 Spark 集群,Spark 将在同一台机器上运行所有内容,非常方便用于本地测试,对于 Spark SQL,只需要在创建 SparkSession 时采用 local 的模式即可,

class MyUtils extends Serializable {
  def esHost() = s"es.sherlockyb.club"
  
  // local mode
  def getLocalSparkSession: SparkSession = SparkSession.builder()
    .master("local")
    .getOrCreate()
  
  // cluster mode
  def getSparkSession: SparkSession = SparkSession.builder()
    .enableHiveSupport()
    .config("spark.sql.broadcastTimeout", "3600")
    .getOrCreate()
}
object LocalTest extends LazyLogging {
  def main(args: Array[String]): Unit = {
    new LocalTest().run()
  }
}

class LocalTest {
  def run(): Unit = {
    val myUtils = new MyUtils
    val spark = myUtils.getLocalSparkSession
    import spark.implicits._

    var start = System.currentTimeMillis()
    val attributeId = 7378L
    val labelNames = Array("aen-label-retail", "aen-label-seller")
    spark.read
      .format("es")
      .option("es.nodes", myUtils.esHost())
      .option("es.port", "9200")
      .option("es.nodes.wan.only", value = true)
      .option("es.resource", Joiner.on(",").join(java.util.Arrays.asList(labelNames:_*)) + "/label")
      .option("es.scroll.size", 2000)
      .load()
      .createOrReplaceTempView("temp_labels")
    
    val sqlDf = spark.sql("select itemId, labels from temp_labels where itemId in (123, 456)")
    val newDf = sqlDf
      .map(row => {
        val labels = row.getAs[Seq[Row]]("labels")
        val labelValue = labels.find(p => p.getAs[Long]("id") == attributeId).map(p => p.getAs[String]("label"))

        (row.getAs[Long]("itemId"), attributeId, labelValue.orNull)
      })
      .withColumn("final_result", lit("PASS"))
      .toDF("itemId", "attributeId", "label", "final_result")

    val finalDf = newDf.toDF("itemId", "attributeId", "label", "result")
    finalDf.printSchema()
    finalDf.show()
    
    var emptyDf = newDf
      .filter(col("label").isNotNull)
      .toDF("itemId", "attributeId", "label", "result")
    emptyDf = emptyDf.union(finalDf)
    emptyDf.printSchema()
    emptyDf.show()

    emptyDf.filter(col("itemId") === 6238081929L and col("label").notEqual(col("result")))
      .show()

    val attributeTypeIds = Array.fill(3)(100)
    val attributeTypeIdsStr = Joiner.on(",").join(java.util.Arrays.asList(attributeTypeIds:_*))
    println(attributeTypeIdsStr)


    import scala.collection.JavaConverters._
    emptyDf = emptyDf.filter(!col("itemId").isin(trainItemIds.asScala.map(Long2long).toList:_*))
    emptyDf.show(false)
  }
}

Spark SQL Data Sources

Spark SQL 通过 DataFrameReader 类支持读取各种类型的数据源,比如 Parquet、ORC、JSON、CSV 等格式的文件,Hive table,以及其他 database。而 Elasticsearch 只不过是众多数据源中的一种,DataFrameReader 通过 format(...) 指定数据源格式,通过 option(...) 定制对应数据源下的配置,最后通过 load() 加载生成 DataFrame,也就是 Dataset[Row] 的类型别名。有了 DataFrame,就可以创建一个临时表,然后就能以 SQL 的方式读取数据。

在 Spark 1.5 以前,Elasticsearch 在 format(...) 中对应的 source 名需要是全包名 org.elasticsearch.spark.sql,而在 Spark 1.5 以及之后的版本,source 名称简化为 es

Spark SQL 中 DataFrame 常用 API

  • df.printSchema(),打印 schema
  • df.show(),查看数据列表,默认是 truncate 前 20 条,传 false 时列出全部数据。
  • df.createOrReplaceTempView("view_name"),构建临时表视图,方便后续 SQL 操作。
  • df.withColumn(),添加新列或替换现有列。
    • df.withColumn("final_result", lit("PASS")) ,通过 lit 添加常量列。
  • df.filter(col("label").isNotNull),用指定的条件过滤行。
  • df.dropDuplicates("itemId","attributeId"),按指定列对行去重,返回新的数据集。
  • df.union(otherDf),将两个 DataFrame 的记录合并且不去重,相当于 union all。
  • df.toDF("itemId", "attributeId", "label", "final_result"),为 df 各列指定一个有意义的名称。

Scala 与 Java 类型映射

  • scala.Long -> long
  • Array[T] -> T[]

Scala 与 Java 类型转换

import scala.collection.JavaConverters._
newDf = df.filter(!col("itemId").isin(trainItemIds.asScala.map(Long2long).toList:_*))

Scala 中的 : _*

:_*type ascription 的一个特例,它会告诉编译器将序列类型的单个参数视为变参数序列,即 varargs。应用例子,

val indices = Array("aen-label", "aen-label-seller")
Joiner.on(",").join(java.util.Arrays.asList(indices:_*))

es.nodes.wan.only

该配置项表示连接器是否用于 WAN 上的云或受限环境如 AWS 中的 Elasticsearch 实例,默认为 false,而公司的 Elasticsearch 集群是在 AWS 上的,endpoint 只能在内网访问,因而刚开始测试时,遇到如下报错,

Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available
	at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:159)
	at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:223)
	at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:73)
	at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:72)
	at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:44)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:340)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:723)

通过 option("es.nodes.wan.only", value = true) 将配置项设置为 true 后恢复正常。

importing spark.implicits._

在遍历 DataFrame 时遇到如下编译错误,

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._

在处理 DataFrame 之前需要加上 importing spark.implicits._,用于将常见的 Scala 对象转换为 DataFrame,通常在获取 SparkSession 后立马 import。

Spark SQL 读取 hive 表中 array 类型时,对于 Scala 语言,得到的类型是 WrappedArray 而不是 Array

当我们通过 createOrReplaceTempView("temp_labels") 构建一个临时表视图后,就可以通过 SQL 像操作 hive 表那样读取数据。例如读取指定的列,

val sqlDf = spark.sql("select itemId, labels from temp_labels where itemId in (123, 456)")

通过 sqlDf.printSchema() 可以看到 sqlDf 的 schema 长这样,

root
 |-- itemId: long (nullable = true)
 |-- labels: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- label: string (nullable = true)

labels 是包含 struct 的数组,于是从 row 中将 labels 列读出时想尝试转换为 Array,

val newDf = sqlDf.map(
  row => {
    val labels = row.getAs[Array[Row]]("labels")
    val labelValue = labels.find(p => p.getAs[Long]("id") == attributeId).map(p => p.getAs[String]("label"))

    (row.getAs[Long]("itemId"), attributeId, labelValue.orNull)
  }
)

结果报错如下,

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lorg.apache.spark.sql.Row;

可以看到 Spark SQL 在读取表中数组列时,是用的 scala.collection.mutable.WrappedArray 来存储结果的,看其类定义可知,它是间接实现 Seq 接口的,所以也可用 row.getAs[Seq[Row]]("labels") 来读取。这里需要注意的是,Array[T] 虽然在 Scala 源码定义中是 class,但其对标的 Java 类型是原生数组 T[]

判断 Column 是否为 null 时,需要用 is nullis not null,而不是 === !==

对于错误的用法,filter 并不会生效,就像下面这样

newDf.filter(col("label") !== null)

这一点和 hive 表以及 MySQL 表判断字段是否为 null,是保持一致的,应该像下面这样,

newDf.filter(col("label").isNotNull)
import com.google.common.base.Joiner
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

object TestMain extends LazyLogging {
  def main(args: Array[String]): Unit = {
    val myUtils = new MyUtils
    new TestApp(myUtils).run()
  }
}

class TestApp(myUtils: MyUtils) extends Serializable with LazyLogging {  
  def esDf(spark: SparkSession, indices: Array[String]): DataFrame = {
    spark.read
      .format("es")
      .option("es.nodes", myUtils.esHost())
      .option("es.port", "9200")
      .option("es.nodes.wan.only", value = true)
      .option("es.resource", Joiner.on(",").join(java.util.Arrays.asList(indices:_*)) + "/label")
      .option("es.scroll.size", 2000)
      .load()
  }
  
  def run(): Unit = {
    val spark = myUtils.getSparkSession
    import spark.implicits._
    
    val esTempView = "es_label"
    val labelNames = Array("aen-label-retail", "aen-label-seller")
    esDf(spark, labelNames).createOrReplaceTempView(esTempView)
    
    val labelDf = getLabelDf(spark, itemIdsStr, attributeTypeIds, esTempView)
    println("debug log")
    labelDf.printSchema()
    labelDf.show()
    labelDf.createOrReplaceTempView("final_labels")
    
    val data = spark.sql(
      s"""
      |select cc.*, pp.final_result, pp.label, null as remark
      |from temp_request cc
      |left join final_labels pp
      |on cc.itemid = pp.itemId
      |and cc.attributetypeid = pp.attributeId
      |where cc.profile = '$jobId'
      |""".stripMargin)

    data.distinct().write.mode(SaveMode.Overwrite)
    .option("compression", "gzip")
    .json(s"s3://sherlockyb-test/check-precision/job_id=$jobId")
  }
  
  def getLabelDf(spark: SparkSession, itemIdsStr: String, attributeTypeIds: Array[String], esTempView: String): DataFrame = {
    import spark.implicits._

    val sqlDf = spark.sql(s"select itemId, labels from $esTempView where itemId in ($itemIdsStr)")
    val emptyDf = spark.emptyDataFrame
    var labelDf = emptyDf
    attributeTypeIds.foreach(attributeTypeId => {
      val attributeDf = sqlDf
        .map(row => {
          val labels = row.getAs[Seq[Row]]("labels")
          val labelValue = labels.find(p => p.getAs[Long]("id") == attributeTypeId.toLong).map(p => p.getAs[String]("label"))

          (row.getAs[Long]("itemId"), attributeTypeId.toLong, labelValue.orNull)
        })
        .withColumn("final_result", lit("PASS"))
        .toDF("itemId", "attributeId", "label", "final_result")
        .filter(col("label").isNotNull)
      if (labelDf == emptyDf) {
        labelDf = attributeDf
      } else {
        labelDf = labelDf.union(attributeDf)
      }
    })

    labelDf.dropDuplicates("itemId","attributeId")
  }
}

补充:提交 spark job

将 job 工程打包为 Jar,上传到 AWS 的 s3,比如 s3://sherlockyb-test/1.0.0/artifacts/spark/ 目录下,然后通过 Genie 提交 spark job 到 Spark 集群运行。Genie 是 Netflix 研发的联合作业执行引擎,提供 REST-full API 来运行各种大数据作业,如 Hadoop、Pig、Hive、Spark、Presto、Sqoop 等。

def run_spark(job_name, spark_jar_name, spark_class_name, arg_str, spark_param=''):
    import pygenie

    pygenie.conf.DEFAULT_GENIE_URL = "genie.sherlockyb.club"

    job = pygenie.jobs.GenieJob() \
        .genie_username('sherlockyb') \
        .job_name(job_name) \
        .job_version('0.0.1') \
        .metadata(teamId='team_account') \
        .metadata(teamCredential='team_password')

    job.cluster_tags(['type:yarn-kerberos', 'sched:default'])
    job.command_tags(['type:spark-submit-kerberos', 'ver:2.3.2'])
    job.command_arguments(
        f"--class {spark_class_name} {spark_param} "
        f"s3a://sherlockyb-test/1.0.0/artifacts/spark/{spark_jar_name} "
        f"{arg_str}"
    )

    # Submit the job to Genie
    running_job = job.execute()
    running_job.wait()
    
    return running_job.status

首发链接: https://www.yangbing.club/2022/06/03/Spark-reading-elasticsearch-guide/
许可协议: 除特殊声明外,本博文均采用 CC BY-NC-SA 3.0 CN 许可协议,转载请注明出处!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK