【译】pyspark 包(一)


PySpark是Python针对Spark的API。
Public类:

  • SparkContext: Spark功能的主要切入点。
  • RDD:弹性分布式数据库,Spark中基本的抽象。
  • Broadcast:任务之间能够重用的传播变量。
  • Accumulator:一个“只增加”的共享变量,让你只可以对钙质进行增加操作。
  • SparkConf:配置Spark。
  • SparkFiles:任务之间可以传递的可读取文件。
  • StorageLevel:更细粒度的缓存持久级别。
  • TaskContex:关于目前正在运行任务的信息,在workers和实验中可用。

SparkConf

pyspark.SparkConf(loadDefaults=true,_jvm=None,_jconf=None)
对Spark应用的配置,用来将各种Spark参数设置为键值对。

大多数情况下,我们需要使用SparkConf()来创建一个SparkConf对象,它将会从spark.*Java系统属性中导入配置。在这种情况下,我们直接在SparkConf对象中设置的参数优先级会高于系统属性。

在单元测试的情况下,我们也可以调用SparkConf(false)来跳过导入外部设置,在无论系统属性为何值的情况下都使用相同的配置。

所有在该类中的setter方法都支持链式规则。例如,我们可以写conf.setMaster("local").setAppName("My app")

注意:一旦SparkConf
对象传递给Spark,它会被复制,不能再被用户修改。

  • contains(key):配置中是否包含某个给定的键?
  • get(key, defaultValue=None): 获得一些键的配置值,如果不存在则返回一个默认值。
  • getAll():获取所有值作为一个键值对的列表。
  • set(key, value):设置一个配置属性。
  • setAll(pairs):设置多个参数,使用键值对列表传递(参数:pairs-需要设置的键值对列表)。
  • setAppName(value):设置应用名称。
  • setExecutorEnv(key=None, value=None, pairs=None):设置传递给executors的环境变量。
  • setIfMissing(key, value):如果一个属性未被设置,那么就设置一个配置属性。
  • setMaster(value):设置连接到的master URL。
  • setSparkHone(value):设置安装在worker节点上Spark的安装路径。
  • toDebugString()返回配置的可打印版本,每一行为一个key=value对的列表。

SparkContext

pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=)
Spark功能的主要程序入口。SparkContext表示到Spark集群的连接,可以用来创建RDD,并且可以在集群上传播变量。

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')

  • accumulator(value, accum_param=None),创建一个给定初始值的Accumulator,使用一个给定的AccumulatorParam帮助对象来自定义如何增加数据类型的值。如果没有提供的话,默认的累积参数通常是证书和浮点数。对于其它类型,可以使用自定义的累积参数。
  • addFile(path, recursive=False):添加一个该Spark任务的每个节点上需要下载的文件。path可以是一个本地文件或者是HDFS上的文件(或者是其它Hadoop支持的文件系统——,或者是HTTP,HTTPS或者FTP的URI。如果想要获取Spark任务中的文件的话,可以使用L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}和文件名来找到它的下载路径。如果将递归选项设置为True的话,可以给定一个目录。目前,只支持Hadoop支持的文件系统的目录。
>>> from pyspark import SparkFiles
>>> path = os.path.join(tempdir, "test.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("100")
>>> sc.addFile(path)
>>> def func(iterator):
...    with open(SparkFiles.get("test.txt")) as testFile:
...        fileVal = int(testFile.readline())
...        return [x * fileVal for x in iterator]
>>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
[100, 200, 300, 400]
  • addPyFile(path):添加一个.py或者.zip依赖可以在之后在SparkContext中所有任务用来执行。path可以是一个本地文件或者是HDFS(或者是其它Hadoop支持的文件系统),或者是HTTP,HTTPS或者FTP URI。
  • applicationId:Spark 应用的唯一标识。它的格式根据调度程序实现的不同来决定。(本地spark应用的id类似于:‘local-1433865536131’,YARN的情况下类似于‘application_1433865536131_34483’)
>>> sc.applicationId  
u'local-...'
  • binaryFiles(path, minPartitions=None):(实验)从HDFS中读取二进制文件的目录,一个本地文件系统(所有节点上可用),或者其它Hadoop支持的文件系统的URI作为字节数组。每个文件被读取为一个单一的记录并且返回一个键值对,其中键为每个文件的路劲,值为每个文件的内容。(注意:更倾向于小文件,也允许大文件,但是可能会带来糟糕的性能。)
  • binaryRecords(path, recordLength):(实验)从扁平的二进制文件中载入数据,假设每条记录是特定数字格式的一系列数字(查看ByteBuffer),并且每条记录的字节数目是常数。(参数:path-输入数据文件的目录;recordLength-分割记录的长度)
  • broadcast(value):创博一个只读的变量到集群,返回一个L{Broadcast<pyspark.broadcast.Broadcast>}对象可以在分布式函数中读取。该变量只会被传送到每个集群一次
  • cancelAllJobs():取消所有被调度或者正在运行的任务。
  • cancelJobGroup(groupId):取消指定组的活跃任务,可以查看SparkContext.setJobGroup了解更多信息
  • defaultMinPartitions:当用户没有给定Hadoop RDD的分片时的默认最小数目
  • defaultParallelism:当用户没有给定时的默认并发级别(例如reduce任务)
  • dump_profiles(path):将配置文件的状态导入到path目录
  • emptyRDD():创建没有分区或元素的RDD
  • getConf()
  • getLocalProperty(key):获取在该线程中设置的本地属性,如果没有的话则返回null,可以查看setLocalProperty
  • getOrCreate(conf=None):获得或初始化一个SparkContext,并且将其注册为单例模式。(参数:conf-SparkConf(可选))
  • hadoopFile((path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0):从HDFS/本地文件系统(所有节点可用)或任何Hadoop支持的文件系统上URI读取一个“旧”Haddop输入形式——随机的key和value类。机制与sc.sequenceFile相同,Hadoop配置可以使用Python字典的形式传递,将会被转化到Java中的配置。(参数:path-Hadoop文件的路径;inputFormatClass-Hadoop输入形式的完全限定的类名,例如-“org.apache.hadoop.mapred.TextInputFormat”;keyClass-key Writable类的完全限定类名,例如“org.apache.hadoop.io.Text”;valueClass-value Writable类的完全限定类名,例如“org.apache.hadoop.io.LongWritable”;keyConverter-默认为None;valueConvert-默认为None;conf-Hadoop配置,以字典的形式传递,默认为None;batchSize-Python对象的数目表示为一个单一的Java对象,默认为0,自动选择批量大小)
  • hadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0):从一个随机的Hadoop配置中读取一个“旧”Haddop输入形式——随机的key和value类,配置是通过Python字典进行传递的。它将转化为Java中的配置,其机制与sc.sequnceFile相同。(参数:path-Hadoop文件的路径;inputFormatClass-Hadoop输入形式的完全限定的类名,例如-“org.apache.hadoop.mapred.TextInputFormat”;keyClass-key Writable类的完全限定类名,例如“org.apache.hadoop.io.Text”;valueClass-value Writable类的完全限定类名,例如“org.apache.hadoop.io.LongWritable”;keyConverter-默认为None;valueConvert-默认为None;conf-Hadoop配置,以字典的形式传递,默认为None;batchSize-Python对象的数目表示为一个单一的Java对象,默认为0,自动选择批量大小)
  • newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0):从HDFS/本地文件系统(所有节点可用)或任何Hadoop支持的文件系统上URI读取一个“旧”Haddop输入形式——随机的key和value类。机制与sc.sequenceFile相同,Hadoop配置可以使用Python字典的形式传递,将会被转化到Java中的配置。(参数:path-Hadoop文件的路径;inputFormatClass-Hadoop输入形式的完全限定的类名,例如-“org.apache.hadoop.mapred.TextInputFormat”;keyClass-key Writable类的完全限定类名,例如“org.apache.hadoop.io.Text”;valueClass-value Writable类的完全限定类名,例如“org.apache.hadoop.io.LongWritable”;keyConverter-默认为None;valueConvert-默认为None;conf-Hadoop配置,以字典的形式传递,默认为None;batchSize-Python对象的数目表示为一个单一的Java对象,默认为0,自动选择批量大小)
  • newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0):从一个随机的Hadoop配置中读取一个“旧”Haddop输入形式——随机的key和value类,配置是通过Python字典进行传递的。它将转化为Java中的配置,其机制与sc.sequnceFile相同。(参数:path-Hadoop文件的路径;inputFormatClass-Hadoop输入形式的完全限定的类名,例如-“org.apache.hadoop.mapred.TextInputFormat”;keyClass-key Writable类的完全限定类名,例如“org.apache.hadoop.io.Text”;valueClass-value Writable类的完全限定类名,例如“org.apache.hadoop.io.LongWritable”;keyConverter-默认为None;valueConvert-默认为None;conf-Hadoop配置,以字典的形式传递,默认为None;batchSize-Python对象的数目表示为一个单一的Java对象,默认为0,自动选择批量大小)
  • parallelize(c, numSlices=None):将一个本地的Python集合进行分发形成一个RDD,如果输入表示一个范围的话,为了提高性能,推荐使用xrange。
>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]
  • pickleFile(name, minPartitions=None):导入之前使用RDD.saveAsPickleFile方法存储的RDD。
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
>>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  • range(start, end=None, step=1, numSlices=None):创建一个包含从startend(不包含)元素的RDD,每个元素增加step。和python内置的range()调用方式相同。如果使用一个单一的参数调用,该参数被解析为endstart被设为0。(参数:start-起始值;end-终止值,不包含;step-增加的步长,默认为1;numSlices-新RDD的分片数目;返回-int型的RDD)
>>> sc.range(5).collect()
[0, 1, 2, 3, 4]
>>> sc.range(2, 4).collect()
[2, 3]
>>> sc.range(1, 7, 2).collect()
[1, 3, 5]

参考文献

打赏

mickey

记录生活,写给几十年后的自己。