2

第一个spark应用开发详解(java版)

 2 years ago
source link: https://blog.51cto.com/zq2599/5569399
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

第一个spark应用开发详解(java版)

推荐 原创

程序员欣宸 2022-08-12 07:13:26 博主文章分类:Java技能 ©著作权

文章标签 spark java hdfs 文章分类 Spark 大数据 yyds干货盘点 阅读数195

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

  • WordCount是大数据学习最好的入门demo,今天就一起开发java版本的WordCount,然后提交到Spark2.3.2环境运行;
  1. 操作系统:CentOS7;
  2. JDK:1.8.0_191;
  3. Spark:2.3.3;
  4. Scala:2.11.12;
  5. Hadoop:2.7.7;
  6. Maven:3.5.0;

关于hadoop环境

  • 本次实战用到了hadoop的hdfs,关于hadoop的部署,请参考《Linux部署hadoop2.7.7集群》

关于spark环境

  • 本次实战用到了spark2.3.3,关于spark集群的部署,请参考《部署spark2.2集群(standalone模式)》
    请注意,由于2.3.3版本的spark-core的jar包不支持scala2.12,所以在部署spark的时候,scala版本请使用2.11;

关于本次实战开发的应用

  • 本次实战开发的应用是经典的WorkCount,也就是指定一个文本文件,统计其中每个单词出现的次数,再取出现次数最多的10个,打印出来,并保存在hdfs文件中;

本次统计单词数用到的文本

  • 本次用到的txt文件,是我在网上找到的pdf版本的《乱世佳人》英文版,然后导出为txt,读者您可以自行选择适合的txt文件来测试;
  • 在hdfs服务所在的机器上执行以下命令,创建input文件夹:
~/hadoop-2.7.7/bin/hdfs dfs -mkdir /input
  • 在hdfs服务所在的机器上执行以下命令,创建output文件夹:
~/hadoop-2.7.7/bin/hdfs dfs -mkdir /output
  • 把本次用到的text文件上传到hdfs服务所在的机器,再执行以下命令将文本文件上传到hdfs的/input文件夹下:
~/hadoop-2.7.7/bin/hdfs dfs -put ~/GoneWiththeWind.txt /input
  • 接下来详细讲述应用的编码过程,如果您不想自己写代码,也可以在GitHub下载完整的应用源码,地址和链接信息如下表所示:
名称 链接 备注
项目主页  https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https)  https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh)  [email protected]:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,本章源码在sparkwordcount这个文件夹下,如下图红框所示:
    第一个spark应用开发详解(java版)_java
  • 基于maven创建一个java应用sparkwordcount,pom.xml的内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bolingcavalry</groupId>
    <artifactId>sparkwordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>false</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>com.bolingcavalry.sparkwordcount.WordCount</mainClass>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
  • 创建WrodCount类,关键代码位置都有注释,就不再赘述了:
package com.bolingcavalry.sparkwordcount;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * @Description: spark的WordCount实战
 * @author: willzhao E-mail: [email protected]
 * @date: 2019/2/8 17:21
 */
public class WordCount {

    public static void main(String[] args) {
        String hdfsHost = args[0];
        String hdfsPort = args[1];
        String textFileName = args[2];

        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        String hdfsBasePath = "hdfs://" + hdfsHost + ":" + hdfsPort;
        //文本文件的hdfs路径
        String inputPath = hdfsBasePath + "/input/" + textFileName;

        //输出结果文件的hdfs路径
        String outputPath = hdfsBasePath + "/output/"
                       + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());

        System.out.println("input path : " + inputPath);

        System.out.println("output path : " + outputPath);

        //导入文件
        JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);

        JavaPairRDD<String, Integer> counts = textFile
                //每一行都分割成单词,返回后组成一个大集合
                .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
                //key是单词,value是1
                .mapToPair(word -> new Tuple2<>(word, 1))
                //基于key进行reduce,逻辑是将value累加
                .reduceByKey((a, b) -> a + b);

        //先将key和value倒过来,再按照key排序
        JavaPairRDD<Integer, String> sorts = counts
                //key和value颠倒,生成新的map
                .mapToPair(tuple2 -> new Tuple2<>(tuple2._2(), tuple2._1()))
                //按照key倒排序
                .sortByKey(false);

        //取前10个
        List<Tuple2<Integer, String>> top10 = sorts.take(10);

        //打印出来
        for(Tuple2<Integer, String> tuple2 : top10){
            System.out.println(tuple2._2() + "\t" + tuple2._1());
        }

        //分区合并成一个,再导出为一个txt保存在hdfs
        javaSparkContext.parallelize(top10).coalesce(1).saveAsTextFile(outputPath);

        //关闭context
        javaSparkContext.close();
    }
}
  • 在pom.xml目录下执行以下命令,编译构建jar包:
mvn clean package -Dmaven.test.skip=true
  • 构建成功后,在target目录下生成文件sparkwordcount-1.0-SNAPSHOT.jar,上传到spark服务器的**~/jars/**目录下;
  • 假设spark服务器的IP地址为192.168.119.163,在spark服务器执行以下命令即可提交任务:
~/spark-2.3.2-bin-hadoop2.7/bin/spark-submit \
--master spark://192.168.119.163:7077 \
--class com.bolingcavalry.sparkwordcount.WordCount \
--executor-memory 512m \
--total-executor-cores 2 \
~/jars/sparkwordcount-1.0-SNAPSHOT.jar \
192.168.119.163 \
8020 \
GoneWiththeWind.txt
  • 上述命令的最后三个参数,是java的main方法的入参,具体的使用请参照WordCount类的源码;
  • 提交成功后立即开始执行任务,看到类似如下信息表示任务完成:
2019-02-08 21:26:04 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2019-02-08 21:26:04 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-02-08 21:26:04 INFO  SparkContext:54 - Successfully stopped SparkContext
2019-02-08 21:26:04 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-02-08 21:26:04 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-c3e2ea9e-7daf-4cab-a207-26f0a0394017
2019-02-08 21:26:04 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-d60e4d75-4189-4f33-a5e2-fbe9b06bdae7
  • 往前翻滚一下控制台输出的信息,如下所示,可以见到单词统计的前十名已经输出在控制台了:
2019-02-08 21:36:15 INFO  DAGScheduler:54 - Job 1 finished: take at WordCount.java:61, took 0.313008 s
the	18264
and	14150
to	10020
of	8615
a	7571
her	7086
she	6217
was	5912
in	5751
had	4502
2019-02-08 21:36:15 INFO  deprecation:1173 - mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
2019-02-08 21:36:15 INFO  FileOutputCommitter:108 - File Output Committer Algorithm version is 1
  • 在hdfs服务器执行查看文件的命令,可见/output下新建了子目录20190208213610:
[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -ls /output
Found 1 items
drwxr-xr-x   - hadoop supergroup          0 2019-02-08 21:36 /output/20190208213610
  • 查看子目录,发现里面有两个文件:
[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -ls /output/20190208213610
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2019-02-08 21:36 /output/20190208213610/_SUCCESS
-rw-r--r--   3 hadoop supergroup        108 2019-02-08 21:36 /output/20190208213610/part-00000
  • 上面看到的**/output/20190208213610/part-00000**就是输出结果,用cat命令查看其内容:
[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -cat /output/20190208213610/part-00000
(18264,the)
(14150,and)
(10020,to)
(8615,of)
(7571,a)
(7086,her)
(6217,she)
(5912,was)
(5751,in)
(4502,had)
  • 可见与前面控制台输出的一致;
  • 在spark的web页面,可见刚刚执行的任务信息:
第一个spark应用开发详解(java版)_spark_02
  • 至此,第一个spark应用的开发和运行就完成了,接下来的文章中,咱们一起来完成更多的spark实战;

欢迎关注51CTO博客:程序员欣宸

 学习路上,你不孤单,欣宸原创一路相伴…

  • 收藏
  • 评论
  • 分享
  • 举报

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK