A native Python implementation of Spark's RDD interface, but instead of being resilient and distributed it is just transient and local; but fast (lower latency than PySpark). It is a drop in replacement for PySpark's SparkContext and RDD.
Use case: you have a pipeline that processes 100k input documents and converts them to normalized features. They are used to train a local scikit-learn classifier. The preprocessing is perfect for a full Spark task. Now, you want to use this trained classifier in an API endpoint. You need the same pre-processing pipeline for a single document per API call. This does not have to be done in parallel, but there should be only a small overhead in initialization and preferably no dependency on the JVM. This is where
pysparkling
shines.
pip install pysparkling
- Parallelization via
multiprocessing.Pool
,concurrent.futures.ThreadPoolExecutor
or any other Pool-like objects that have amap(func, iterable)
method. - AWS S3 is supported. Use file paths of the form
s3n://bucket_name/filename.txt
withContext.textFile()
. Specify multiple files separated by comma. Use environment variablesAWS_SECRET_ACCESS_KEY
andAWS_ACCESS_KEY_ID
for auth. Mixed local and S3 files are supported. Glob expressions (filenames with*
and?
) are resolved. - Lazy execution is in development.
- Seamless handling of compressed files is not supported yet.
- only dependency:
boto
for AWS S3 access
Count the lines in the *.py
files in the tests
directory:
import pysparkling
context = pysparkling.Context()
print(context.textFile('tests/*.py').count())
__init__(pool=None)
: takes a pool object (an object that has amap()
method, e.g. a multiprocessing.Pool) to parallelize allmap()
andforeach()
methods.textFile(filename)
: load every line of a text file into a RDD.filename
can contain a comma separated list of many files,?
and*
wildcards, file paths on S3 (s3n://bucket_name/filename.txt
) and local file paths (relative/path/my_text.txt
,/absolut/path/my_text.txt
orfile:///absolute/file/path.txt
). If the filename points to a folder containingpart*
files, those are resolved.broadcast(var)
: returns an instance ofBroadcast()
and it's values are accessed withvalue
.
cache()
: execute previous steps and cache resultcoalesce()
: do nothingcollect()
: return the underlying listcount()
: get length of internal listcountApprox()
: same ascount()
countByKey
: input is list of pairs, returns a dictionarycountByValue
: input is a list, returns a dictionarycontext()
: return the contextdistinct()
: returns a new RDD containing the distinct elementsfilter(func)
: return new RDD filtered with funcfirst()
: return first elementflatMap(func)
: return a new RDD of a flattened mapflatMapValues(func)
: return new RDDfold(zeroValue, op)
: aggregate elementsfoldByKey(zeroValue, op)
: aggregate elements by keyforeach(func)
: apply func to every element in placeforeachPartition(func)
: same asforeach()
groupBy(func)
: group by the output of funcgroupByKey()
: group by key where the RDD is of type [(key, value), ...]histogram(buckets)
: buckets can be a list or an intid()
: currently just returns Noneintersection(other)
: return a new RDD with the intersectionisCheckpointed()
: returns Falsejoin(other)
: joinkeyBy(func)
: creates tuple in new RDDkeys()
: returns the keys of tuples in new RDDleftOuterJoin(other)
: left outer joinlookup(key)
: return list of values for this keymap(func)
: apply func to every element and return a new RDDmapValues(func)
: apply func to value in (key, value) pairs and return a new RDDmax()
: get the maximum elementmean()
: meanmin()
: get the minimum elementname()
: RDD's namepersist()
: implemented as synonym forcache()
pipe(command)
: pipe the elements through an external command line toolreduce()
: reducereduceByKey()
: reduce by key and return the new RDDrightOuterJoin(other)
: right outer joinsaveAsTextFile(path)
: save RDD as text filesubtract(other)
: return a new RDD without the elements in othersum()
: sumtake(n)
: get the first n elementstakeSample(n)
: get n random samples
value
: access the value it stores