定义

Spark是一个高效,通用的大数据处理引擎。

背景

  • 2009年,Spark诞生于伯克利大学AMPLab,最初属于伯克利大学的研究性项目。
  • 2010年,正式开源。
  • 2013年,成为了Apache基金项目,同年,基于spark的开源商业公司Databricks成立。
  • 2014年,成为Apache基金的顶级项目。
spark相关组件

MapReduce & Spark

1.png

七个MapReduce作业意味着需要七次读取和写入HDFS,而它们的输入输出数据存在关联,七个作业输入输出数据关系如下图。

2.jpg

基于MapReduce实现此算法存在以下问题:

  • 为了实现一个业务逻辑需要使用七个MapReduce作业,七个作业间的数据交换通过HDFS完成,增加了网络和磁盘的开销。
  • 七个作业都需要分别调度到集群中运行,增加了Gaia集群的资源调度开销。
  • MR2和MR3重复读取相同的数据,造成冗余的HDFS读写开销。

这些问题导致作业运行时间大大增长,作业成本增加。相比与MapReduce编程模型,Spark提供了更加灵活的DAG(Directed Acyclic Graph) 编程模型, 不仅包含传统的map、reduce接口, 还增加了filter、flatMap、union等操作接口,使得编写Spark程序更加灵活方便。使用Spark编程接口实现上述的业务逻辑如下图所示。

3.jpg

相对于MapReduce,Spark在以下方面优化了作业的执行时间和资源使用。

  • DAG编程模型。 通过Spark的DAG编程模型可以把七个MapReduce简化为一个Spark作业。Spark会把该作业自动切分为八个Stage,每个Stage包含多个可并行执行的Tasks。Stage之间的数据通过Shuffle传递。最终只需要读取和写入HDFS一次。减少了六次HDFS的读写,读写HDFS减少了70%。
  • Spark作业启动后会申请所需的Executor资源,所有Stage的Tasks以线程的方式运行,共用Executors,相对于MapReduce方式,Spark申请资源的次数减少了近90%。
  • Spark引入了RDD(Resilient Distributed Dataset)模型,中间数据都以RDD的形式存储,而RDD分布存储于slave节点的内存中,这就减少了计算过程中读写磁盘的次数。RDD还提供了Cache机制,例如对上图的rdd3进行Cache后,rdd4和rdd7都可以访问rdd3的数据。相对于MapReduce减少MR2和MR3重复读取相同数据的问题。

附(spark统计字符串代码)

WordCount.java

public class WordCount {
    
    // 比较器,其中的Tuple2是模仿的scala写法,
    // 诸如此类的还有Tuple3,Tuple4,Tuple22
    public static class TupleComparator implements Comparator<Tuple2<String, Integer>>, Serializable {
        @Override
        public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
            return o2._2.compareTo(o1._2);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 使用local模式,不需要启动spark集群
        SparkConf sparkConf = new SparkConf().setAppName("wordCount ").setMaster("local[2]");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        JavaRDD<String> file = ctx.textFile("分析的文件路径", 6);
        file.persist(StorageLevel.MEMORY_ONLY());
        file.cache();
        Comparator<Tuple2<String, Integer>> orderCompare = new TupleComparator();
        List<Tuple2<String, Integer>> wordToCounts = file
                .flatMap(line -> Arrays.asList(line.split("")).iterator())
                .mapToPair(word -> new Tuple2<String, Integer>(word, 1))//把分割的内容作为key,1作为初始值
                .reduceByKey((s1, s2) -> s1 + s2)// 将相同的key进行reduce,并将value相加
                .takeOrdered(50, orderCompare);
        wordToCounts.forEach(line -> System.out.println(line._1() + ":" + line._2()));
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>liao</groupId>
  <artifactId>wordcount</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>wordcount</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.1.0</spark.version>
  </properties>
  <dependencies>

    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>1.2.4</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>javax.servlet-api</artifactId>
      <version>3.1.0</version>
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!--<dependency>-->
    <!--<groupId>org.apache.hadoop</groupId>-->
    <!--<artifactId>hadoop-client</artifactId>-->
    <!--<version>2.6.5</version>-->
    <!--<scope>compile</scope>-->
    <!--</dependency>-->
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>2.1.1</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>2.1.1</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.ansj</groupId>
      <artifactId>ansj_seg</artifactId>
      <version>5.1.6</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>net.alchim31.maven</groupId>
          <artifactId>scala-maven-plugin</artifactId>
          <version>3.4.1</version>
        </plugin>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>2.0.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <executions>
          <execution>
            <id>scala-compile-first</id>
            <phase>process-resources</phase>
            <goals>
              <goal>add-source</goal>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>scala-test-compile</id>
            <phase>process-test-resources</phase>
            <goals>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
        <executions>
          <execution>
            <phase>compile</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐