4

elasticsearch-spark的用法 - ZepheryWen

 2 years ago
source link: https://www.cnblogs.com/w1570631036/p/16299110.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

elasticsearch-spark的用法

作者:Zephery
个人网址:http://www.wenzhihuai.com
本文为作者原创,转载请注明出处:https://www.cnblogs.com/w1570631036/p/16299110.html


一、原生RDD支持
1.1 基础配置
1.2 读取es数据
1.3 写数据
二、Spark Streaming
三、Spark SQL
四、Spark Structure Streaming

Hadoop允许Elasticsearch在Spark中以两种方式使用:通过自2.1以来的原生RDD支持,或者通过自2.0以来的Map/Reduce桥接器。从5.0版本开始,elasticsearch-hadoop就支持Spark 2.0。目前spark支持的数据源有:
(1)文件系统:LocalFS、HDFS、Hive、text、parquet、orc、json、csv
(2)数据RDBMS:mysql、oracle、mssql
(3)NOSQL数据库:HBase、ES、Redis
(4)消息对象:Redis

elasticsearch相对hdfs来说,容易搭建、并且有可视化kibana支持,非常方便spark的初学入门,本文主要讲解用elasticsearch-spark的入门。

Spark - Apache Spark

一、原生RDD支持

1.1 基础配置

相关库引入:

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-30_2.13</artifactId>
            <version>8.1.3</version>
        </dependency>

SparkConf配置,更多详细的请点击这里或者源码ConfigurationOptions

public static SparkConf getSparkConf() {
    SparkConf sparkConf = new SparkConf().setAppName("elasticsearch-spark-demo");
    sparkConf.set("es.nodes", "host")
            .set("es.port", "xxxxxx")
            .set("es.nodes.wan.only", "true")
            .set("es.net.http.auth.user", "elxxxxastic")
            .set("es.net.http.auth.pass", "xxxx")
            .setMaster("local[*]");
    return sparkConf;
}

1.2 读取es数据

这里用的是kibana提供的sample data里面的索引kibana_sample_data_ecommerce,也可以替换成自己的索引。

public static void main(String[] args) {
    SparkConf conf = getSparkConf();
    try (JavaSparkContext jsc = new JavaSparkContext(conf)) {

        JavaPairRDD<String, Map<String, Object>> esRDD =
                JavaEsSpark.esRDD(jsc, "kibana_sample_data_ecommerce");
        esRDD.collect().forEach(System.out::println);
    }
}

esRDD同时也支持query语句esRDD(final JavaSparkContext jsc, final String resource, final String query),一般对es的查询都需要根据时间筛选一下,不过相对于es的官方sdk,并没有那么友好的api,只能直接使用原生的dsl语句。

1.3 写数据

支持序列化对象、json,并且能够使用占位符动态索引写入数据(使用较少),不过多介绍了。

public static void jsonWrite(){
    String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";
    String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";
    JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
    JavaEsSpark.saveJsonToEs(stringRDD, "spark-json");
}

比较常用的读写也就这些,更多可以看下官网相关介绍。

二、Spark Streaming

spark的实时处理,es5.0的时候开始支持,Spark Streaming中的DStream编程接口是RDD,我们需要对RDD进行处理,处理起来较为费劲且不美观。

在spark streaming中,如果我们需要修改流程序的代码,在修改代码重新提交任务时,是不能从checkpoint中恢复数据的(程序就跑不起来),是因为spark不认识修改后的程序了。

public class EsSparkStreaming extends EsBaseConfig {
    public static void main(String[] args) throws StreamingQueryException, TimeoutException {
        SparkConf conf = getSparkConf();
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaStreamingContext jssc = new JavaStreamingContext(jsc, Seconds.apply(1));

        Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
        Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

        JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
        Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
        microbatches.add(javaRDD);
        JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);

        JavaEsSparkStreaming.saveToEs(javaDStream, "spark-streaming");

        jssc.start();
    }
}

这里没有执行awaitTermination,执行代码后没有卡住,即可在es上查看

image-20220522204822320

三、Spark SQL

elasticsearch-hadoop也提供了spark sql的插件,换言之,elasticsearch变成了Spark SQL的原生数据源,可以通过Spark SQL显示调用,下面的例子将kibana_sample_data_ecommerce索引读取,然后转化成dataset,在用sql来统计出当前货币。

public class EsToMysqlDemo extends EsBaseConfig {
    public static void main(String[] args) {
        SparkConf conf = getSparkConf();
        try (JavaSparkContext jsc = new JavaSparkContext(conf)) {
            SparkSession sparkSession = SparkSession.builder()
                    .config(conf)
                    .getOrCreate();
            JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "kibana_sample_data_ecommerce").values();
            JavaRDD<Row> map = esRDD.map(v -> {
                String currency = v.get("currency").toString();
                String customerFullName = v.get("customer_full_name").toString();
                String productsSku = v.getOrDefault("products", "").toString();

                return RowFactory.create(currency, customerFullName, productsSku);
            });
            Dataset<Row> dataset = sparkSession.createDataFrame(map, StructType.fromDDL("currency string,customer_full_name string,products string"));
            dataset.show(2);

            Dataset<Row> count = dataset.select("currency").groupBy("currency").count();
            count.show(2);


        }
    }
}

第一个show展示了当前的dataset,第二个show展示group by之后的结果。

image-20220522204908187

四、Spark Structure Streaming

Structured Streaming使用DataFrame、DataSet的编程接口,处理数据时可以使用Spark SQL中提供的方法,数据的转换和输出会变得更加简单。

在structured streaming中,对于指定的代码修改操作,是不影响修改后从checkpoint中恢复数据的。具体可参见文档。下面这个例子是从控制台中读取数据,然后根据","切割,把第一个赋值给name,然后写入到es的spark-structured-streaming索引中去,启动程序前需要在控制台执行下命令:nc -lk 9999。

@Data
public static class PersonBean {
    private String name;
    private String surname;
}

public static void main(String[] args) throws StreamingQueryException {
    SparkConf sparkConf = getSparkConf();
    SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();


    Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 9999).load();

    Dataset<PersonBean> people = lines.as(Encoders.STRING())
            .map((MapFunction<String, PersonBean>) value -> {
                String[] split = value.split(",");
                PersonBean personBean = new PersonBean();
                personBean.setName(split[0]);
                return personBean;
            }, Encoders.bean(PersonBean.class));

    StreamingQuery es = people.writeStream().option("checkpointLocation", "./location")
            .format("es").start("spark-structured-streaming");
    es.awaitTermination();
}

checkpointLocation是用来设置检查点,里面会存储一些commits、offsets、sinks、metadata的信息。

image-20220522204637235

执行完nc -lk 9999后,在控制台随便输入,即可在es中查看响应的结果。

image-20220522204930968

相关源代码:

spark-java-demo

1.Apache Spark support

2.elasticsearch-hadoop

3.使用SparkSQL操作Elasticsearch - Spark入门教程

4.Spark——Spark Streaming 对比 Structured Streaming


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK