最新公告
  • 欢迎光临数据科学与编程,我们是数据学科学兴趣交流小组立即加入我们
  • 【PySpark源码解析】教你用Python调用高效Scala接口

    众所周知,Spark 框架主要是由 Scala 语言实现,同时也包含少量 Java 代码。Spark 面向用户的编程接口,也是 Scala。然而,在数据科学领域,Python 一直占据比较重要的地位,仍然有大量的数据工程师在使用各类 Python 数据处理和科学计算的库,例如 numpy、Pandas、scikit-learn 等。同时,Python 语言的入门门槛也显著低于 Scala。
     
    为此,Spark 推出了 PySpark,在 Spark 框架上提供一套 Python 的接口,方便广大数据科学家使用。本文主要从源码实现层面解析 PySpark 的实现原理,包括以下几个方面:

     

    • PySpark 的多进程架构;
    • Python 端调用 Java、Scala 接口;
    • Python Driver 端 RDD、SQL 接口;
    • Executor 端进程间通信和序列化;
    • Pandas UDF;
    • 总结。

     

    PySpark项目地址:https://github.com/apache/spark/tree/master/python

     

    1、PySpark 的多进程架构

     

    PySpark 采用了 Python、JVM 进程分离的多进程架构,在 Driver、Executor 端均会同时有 Python、JVM 两个进程。当通过 spark-submit 提交一个 PySpark 的 Python 脚本时,Driver 端会直接运行这个 Python 脚本,并从 Python 中启动 JVM;而在 Python 中调用的 RDD 或者 DataFrame 的操作,会通过 Py4j 调用到 Java 的接口。
     
    在 Executor 端恰好是反过来,首先由 Driver 启动了 JVM 的 Executor 进程,然后在 JVM 中去启动 Python 的子进程,用以执行 Python 的 UDF,这其中是使用了 socket 来做进程间通信。总体的架构图如下所示:
     

    2、Python Driver 如何调用 Java 的接口

     

    上面提到,通过 spark-submit 提交 PySpark 作业后,Driver 端首先是运行用户提交的 Python 脚本,然而 Spark 提供的大多数 API 都是 Scala 或者 Java 的,那么就需要能够在 Python 中去调用 Java 接口。这里 PySpark 使用了 Py4j 这个开源库。当创建 Python 端的 SparkContext 对象时,实际会启动 JVM,并创建一个 Scala 端的 SparkContext 对象。

    3、Python Driver 端的 RDD、SQL 接口
     
    在 PySpark 中,继续初始化一些 Python 和 JVM 的环境后,Python 端的 SparkContext 对象就创建好了,它实际是对 JVM 端接口的一层封装。和 Scala API 类似,SparkContext 对象也提供了各类创建 RDD 的接口,和 Scala API 基本一一对应,我们来看一些例子。
     

    可以看到,这里 Python 端基本就是直接调用了 Java/Scala 接口。而 PythonRDD (core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala),则是一个 Scala 中封装的伴生对象,提供了常用的 RDD IO 相关的接口。另外一些接口会通过 self._jsc 对象去创建 RDD。其中 self._jsc 就是 JVM 中的 SparkContext 对象。拿到 RDD 对象之后,可以像 Scala、Java API 一样,对 RDD 进行各类操作,这些大部分都封装在 python/pyspark/rdd.py 中。

     

    这里的代码中出现了 jrdd 这样一个对象,这实际上是 Scala 为提供 Java 互操作的 RDD 的一个封装,用来提供 Java 的 RDD 接口,具体实现在 core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala 中。可以看到每个 Python 的 RDD 对象需要用一个 JavaRDD 对象去创建。
    对于 DataFrame 接口,Python 层也同样提供了 SparkSession、DataFrame 对象,它们也都是对 Java 层接口的封装,这里不一一赘述。
    4、Executor 端进程间通信和序列化

     

    对于 Spark 内置的算子,在 Python 中调用 RDD、DataFrame 的接口后,从上文可以看出会通过 JVM 去调用到 Scala 的接口,最后执行和直接使用 Scala 并无区别。而对于需要使用 UDF 的情形,在 Executor 端就需要启动一个 Python worker 子进程,然后执行 UDF 的逻辑。那么 Spark 是怎样判断需要启动子进程的呢?
    在 Spark 编译用户的 DAG 的时候,Catalyst Optimizer 会创建 BatchEvalPython 或者 ArrowEvalPython 这样的 Logical Operator,随后会被转换成 PythonEvals 这个 Physical Operator。在 PythonEvals(sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala)中:

    这里 env.createPythonWorker 会通过 PythonWorkerFactory(core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala)去启动 Python 进程。Executor 端启动 Python 子进程后,会创建一个 socket 与 Python 建立连接。所有 RDD 的数据都要序列化后,通过 socket 发送,而结果数据需要同样的方式序列化传回 JVM。

     

    对于直接使用 RDD 的计算,或者没有开启 spark.sql.execution.arrow.enabled 的 DataFrame,是将输入数据按行发送给 Python,可想而知,这样效率极低。

     

    在 Spark 2.2 后提供了基于 Arrow 的序列化、反序列化的机制(从 3.0 起是默认开启),从 JVM 发送数据到 Python 进程的代码在 sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala。这个类主要是重写了 newWriterThread 这个方法,使用了 ArrowWriter 向 socket 发送数据:
     

    5、Pandas UDF

     

    前面我们已经看到,PySpark 提供了基于 Arrow 的进程间通信来提高效率,那么对于用户在 Python 层的 UDF,是不是也能直接使用到这种高效的内存格式呢?答案是肯定的,这就是 PySpark 推出的 Pandas UDF。区别于以往以行为单位的 UDF,Pandas UDF 是以一个 Pandas Series 为单位,batch 的大小可以由 spark.sql.execution.arrow.maxRecordsPerBatch 这个参数来控制。这是一个来自官方文档的示例:

    上文已经解析过,PySpark 会将 DataFrame 以 Arrow 的方式传递给 Python 进程,Python 中会转换为 Pandas Series,传递给用户的 UDF。在 Pandas UDF 中,可以使用 Pandas 的 API 来完成计算,在易用性和性能上都得到了很大的提升。

     

    6、总结

     

    PySpark 为用户提供了 Python 层对 RDD、DataFrame 的操作接口,同时也支持了 UDF,通过 Arrow、Pandas 向量化的执行,对提升大规模数据处理的吞吐是非常重要的,一方面可以让数据以向量的形式进行计算,提升 cache 命中率,降低函数调用的开销,另一方面对于一些 IO 的操作,也可以降低网络延迟对性能的影响。

     

    然而 PySpark 仍然存在着一些不足,主要有:

     

    • 进程间通信消耗额外的 CPU 资源;
    • 编程接口仍然需要理解 Spark 的分布式计算原理;
    • Pandas UDF 对返回值有一定的限制,返回多列数据不太方便。

     

    Databricks 提出了新的 Koalas 接口来使得用户可以以接近单机版 Pandas 的形式来编写分布式的 Spark 计算作业,对数据科学家会更加友好。而 Vectorized Execution 的推进,有望在 Spark 内部一切数据都是用 Arrow 的格式来存放,对跨语言支持将会更加友好。同时也能看到,在这里仍然有很大的性能、易用性的优化空间,这也是我们平台近期的主要发力方向之一。
    陈绪,汇量科技(Mobvista)高级算法科学家,负责汇量科技大规模数据智能计算引擎和平台的研发工作。在此之前陈绪是阿里巴巴高级技术专家,负责阿里集团大规模机器学习平台的研发。
    免责声明:本文内容来源于网络,文章版权归原作者所有,意在传播相关技术知识&行业趋势,供大家学习交流,若涉及作品版权问题,请联系删除或授权事宜。
    本站上原创文章未经作者许可,不得用于商业用途,仅做学习交流使用,本站免责声明。转载请注明出处,否则保留追究法律责任的权利。《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权
    数据科学与编程 » 【PySpark源码解析】教你用Python调用高效Scala接口

    发表评论

    • 52会员总数(位)
    • 310资源总数(个)
    • 32本周发布(个)
    • 1 今日发布(个)
    • 331稳定运行(天)

    提供最优质的博文资源集合

    立即阅览 了解详情