Windows平台下单机Spark环境搭建

为了在有限的资源上学习大数据处理与分析技术,借鉴Linux以及部分网上的教程,在Windows10平台搭建Spark环境。本文将简单记录搭建流程以及其中遇到的坑。

Spark的部署模式主要有四种:

  • Local模式(单机模式)
  • Standalone模式(使用Spark自带的简单集群管理器)
  • YARN模式(使用YARN作为集群管理器)
  • Mesos模式(使用Mesos作为集群管理器)

安装Java

  • Oracle Java 官网下载JDK并安装,安装路径建议直接选择C:\Java,不要安装在Program Files中(路径有空格会导致后面配置Hadoop比较麻烦)
    • 添加环境变量JAVA_HOME,值为安装路径,如C:\Java\jdk1.8.0_121
    • 在环境变量Path中增加值:%JAVA_HOME%\bin
    • 打开命令行测试是否安装成功,输入java -version,应该出现如下信息
      命令行查看Java版本信息

安装Spark

  • Apache Spark 官网下载预编译的压缩文件,解压到某个路径中不含空格的文件夹下,也就成为Spark的安装路径,如D:\spark
    下载Spark

    • 添加环境变量SPARK_HOME,值为安装路径,如D:\spark
    • 在环境变量Path中增加值:%SPARK_HOME%\bin%SPARK_HOME%\sbin
    • 如果下载的Spark版本>=2.3,建议进一步添加环境变量SPARK_LOCAL_HOSTNAME,值为localhost
    • 进入Spark的配置目录conf,复制一个log4j.properties.template文件并命名为log4j.properties,打开log4j.properties文件,进行如下修改

      # log4j.rootCategory=INFO, console
      log4j.rootCategory=WARN, console
    • 同样在Spark的配置目录conf,复制一个spark-env.sh.template文件并命名为spark-env.sh,打开并增加以下一行代码

      SPARK_LOCAL_IP = 127.0.0.1

安装Hadoop

  • Apache Hadoop 官网下载预编译的压缩包(这里为了更好对应,选择下载2.7版本),解压到某个路径中不含空格的文件夹下,也就称为Hadoop的安装路径,如D:\hadoop
    下载Hadoop(binary版本)

    • 添加环境变量HADOOP_HOME,值为安装路径,如D:\hadoop
    • 在环境变量Path中增加值:%HADOOP_HOME%\bin%HADOOP_HOME%\sbin
    • 进入Hadoop的配置目录etc\hadoop,打开文件hadoop-env.cmd,修改Java的安装路径,如果Java安装在Program Files可以通过设置为PROGRA~1解决空格报错的问题

      set JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_121
    • 下载对应版本的 winutils,把下载到的bin文件夹覆盖到Hadoop安装目录的bin文件夹,确保其中含有winutils.exe文件

    • 新建tmp\hive文件夹,如C:\tmp\hive,命令行导航到Hadoop的bin目录,执行以下授权操作

      winutils.exe chmod -R 777 C:\tmp\hive
    • 最后在命令行输入hadoop version测试是否安装成功
      验证Hadoop安装成功

验证Spark安装成功

  • 打开命令行,运行spark-shell,应该输入如下内容
    验证Spark安装成功
  • 此时进入localhost:4040可以看到Spark的Web界面

使用Spark开发第一个程序

Python

安装PySpark

  • 把Spark安装路径下的python\pyspark文件夹复制到系统Python的包文件夹下,例如在Anaconda环境中,复制到D:\Anaconda3\Lib\site-packages目录下
  • 安装Python包py4j,在命令行运行pip install py4j
  • 验证PySpark配置成功,在命令行输入pyspark,应该输出如下内容
    验证PySpark环境可用

在PyCharm中使用PySpark

下面以一个经典的词频统计(Word Count)程序为例,学习PySpark的使用,词频统计是一个很经典的分布式程序,这里用到中文分词库jieba,去除停用词再进行计数

# -*- coding: utf-8 -*-
# 从pyspark.context模块导入SparkContext
from pyspark.context import SparkContext
import jieba

# 实例化一个SparkContext,用于连接Spark集群
# 第一个参数“local”表示以本地模式加载集群
# 第二个参数“WordCount”表示appName,不能有空格
spark = SparkContext("local", "WordCount")

# 读取数据,创建弹性式分布数据集(RDD)
data = spark.textFile(r"path/to/news.txt")

# 读取中文停用词
with open(r'path/to/stopwords-zh.txt', 'r', encoding='utf-8') as f:
s = f.readline()
stop = [i.replace('\n','') for i in s]

# 分词并统计词频
data = data.flatMap(lambda line: jieba.cut(line,cut_all=False)).\
filter(lambda w: w not in stop).\
map(lambda w: (w,1)).\
reduceByKey(lambda w0, w1: w0 + w1).\
sortBy(lambda x: x[1], ascending=False)

# 输出前100个高频词汇
print(data.take(100))
  • 设置程序运行配置,打开Run->Edit Configuration,按照如下图所示内容新建一个配置,其中环境变量必须加入SPARK_HOMEHADOOP_HOME以及SPARK_LOCAL_HOSTNAME
    设置PyCharm运行配置

  • 运行程序,最后输出前100个高频词语
    WordCount程序输出

程序提交到Spark运行

上述词频统计代码也可以直接提交到Spark运行,方法如下:

  • 打开命令行,导航到Spark的安装目录,执行提交任务命令:

    cd D:/spark
    ./bin/spark-submit /path/to/wordcount.py
  • 最后输出类似的执行结果
    提交Spark任务,并输出运行结果

Scala & Java

Java

  • 在 IntelliJ IDEA 新建一个Maven工程
  • 在项目的Maven配置文件pom.xml中加入Spark-core依赖,根据安装的Spark版本到 Maven Repository 仓库找到对应的Maven依赖文本,如:

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.3</version>
    </dependency>
  • 打开工程目录下的主程序文件,通常为./src/main/java/App.java,编写词频统计代码

  • 下面将以两种形式进行编写,Java Lambda的代码风格接近Python,易于阅读;而Java原生模式则稍显复杂

Java Lambda

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

/**
* Word count!
*
*/
public class App
{
public static void main( String[] args )
{
// 创建Spark实例
SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);

// 读取数据,这里是一个关于Spark介绍的文本
String filename = "path/to/spark.txt";
JavaRDD<String> data = jsc.textFile(filename);

// 切割压平
JavaRDD<String> dataMap = data.flatMap(t -> Arrays.asList(t.split(" ")).iterator());

// 组合成元组
JavaPairRDD<String, Integer> dataPair = dataMap.mapToPair(t -> new Tuple2<>(t,1));

// 分组聚合
JavaPairRDD<String, Integer> dataAgg = dataPair.reduceByKey((w1,w2) -> w1+w2);

// 交换key,再排序
JavaPairRDD<Integer, String> dataSwap = dataAgg.mapToPair(tp -> tp.swap());
JavaPairRDD<Integer, String> dataSort = dataSwap.sortByKey(false);
JavaPairRDD<String, Integer> result = dataSort.mapToPair(tp -> tp.swap());

// 保存结果,saveAsTextFile()方法是将RDD写到本地,根据执行task的多少生成多少个文件
// 输出目录不能预先存在,否则报错
result.saveAsTextFile("path/to/spark_count");

// 输出第一个
List<Tuple2<String, Integer>> resList = result.collect();
for (Tuple2<String, Integer> tp: resList){
System.out.println(tp._1+"\t"+tp._2);
}

jsc.stop();
}
}
  • 最后打开输出结果文件夹的part-00000文件,输出各个单词的统计数:
    (Spark,7)
    (and,7)
    (the,5)
    (Apache,5)
    (of,4)
    (for,3)
    ...

Java 原生模式

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
* Word count!
*
*/
public class App
{
public static void main( String[] args )
{
// 创建Spark实例
SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);

// 读取数据,这里是一个关于Spark介绍的文本
String filename = "path/to/spark.txt";
JavaRDD<String> data = jsc.textFile(filename);

// 切割压平
JavaRDD<String> dataMap = data.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});

// 组合成元组
JavaPairRDD<String, Integer> dataPair = dataMap.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}
});

// 分组聚合
JavaPairRDD<String, Integer> dataAgg = dataPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer w1, Integer w2) throws Exception {
return w1 + w2;
}
});

// 交换key,再排序
JavaPairRDD<Integer, String> dataSwap = dataAgg.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
return tp.swap();
}
});

JavaPairRDD<Integer, String> dataSort = dataSwap.sortByKey(false);

JavaPairRDD<String, Integer> result = dataSort.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
return tp.swap();
}
});

// 保存结果,saveAsTextFile()方法是将RDD写到本地,根据执行task的多少生成多少个文件
// 输出目录不能预先存在,否则报错
result.saveAsTextFile("path/to/spark_count");

// 输出第一个
List<Tuple2<String, Integer>> resList = result.collect();
for (Tuple2<String, Integer> tp: resList){
System.out.println(tp._1+"\t"+tp._2);
}

jsc.stop();
}
}
  • 最后结果与上面情况类似

Scala

(待更新)

文章作者: yxnchen
文章链接: http://yxnchen.github.io/technique/Windows平台下单机Spark环境搭建/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 YXN's Blog