-
Notifications
You must be signed in to change notification settings - Fork 109
3.1 Creating and Running a Job
This document presents an overview of the MapReduce Python API. It consists of the following sections:
- Including the MapReduce library
- Instantiating a MapReduce pipeline
- Starting a MapReduce job
- Showing the MapReduce status monitor
- Determining when a MapReducePipeline job is complete
To enable MapReduce framework in the app add the following include to the app.yaml configuration file:
includes:
- mapreduce/include.yaml
In your code, you instantiate a MapReducePipeline
object inside the run
method of a PipelineBase
object as follows:
class WordCountPipeline(base_handler.PipelineBase):
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"word_count",
"main.word_count_map",
"main.word_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.FileOutputWriter",
mapper_params={
"input_reader": {
"blob_key": blobkey,
},
},
reducer_params={
"output_writer": {
"mime_type": "text/plain",
"output_sharding": "input",
"filesystem": "blobstore",
},
},
shards=16)
yield StoreOutput("WordCount", filekey, output)
The following arguments are supplied to the MapReducePipeline
object's run
method:
- The name of the MapReduce job, for display in the user interface and in any logs
- The mapper function to use
- The reducer function to use
- The input reader to use to supply the mapper function with data
- The output writer for the reducer function to use
- The parameters (if any) to supply to the input reader
- The parameters (if any) to supply to the output writer
- The number of shards (workers) to use for the MapReduce job
You must write your own mapper and reducer functions. (The shuffler feature is built in and you don't invoke it explicitly.) You can use the standard data input readers and output writers (BlobstoreZipInputReader
and FileOutputWriter
in the example).
To start a MapReduce job using the MapReducePipeline
object, you invoke the Pipeline
base class's start
method on it, as shown below:
def post(self):
filekey = self.request.get("filekey")
blob_key = self.request.get("blobkey")
if self.request.get("word_count"):
pipeline = WordCountPipeline(filekey, blob_key)
pipeline.start()
If you wish, you can display a status monitor for your MapReduce jobs, as follows:
def post(self):
filekey = self.request.get("filekey")
blob_key = self.request.get("blobkey")
if self.request.get("word_count"):
pipeline = WordCountPipeline(filekey, blob_key)
pipeline.start()
redirect_url = "%s/status?root=%s" % (pipeline.base_path,
pipeline.pipeline_id)
self.redirect(redirect_url)
To find out whether your MapReduce job is complete, you need to save the pipeline ID when you start the MapReduce job, as shown in the following MapReduce pipeline code:
class StartMapreduce(webapp2.RequestHandler):
def get(self):
pipeline = mapreduce_pipeline.MapreducePipeline(arguments)
pipeline.start()
self.redirect("/wait?pipeline=" + pipeline.pipeline_id)
Notice the redirect above where the pipeline ID is saved.
Then in the handler where you want to do some work when the MapReduce job is complete, you get the MapReduce pipeline using the saved pipeline ID, and you check it to determine whether it is done.
class WaitHandler(webapp2.RequestHandler):
def get(self):
pipeline_id = self.request.get("pipeline")
pipeline = mapreduce_pipeline.MapreducePipeline.from_id(pipeline_id)
if pipeline.has_finalized:
# MapreducePipeline has completed
pass
else:
# MapreducePipeline is still running
pass
As shown above, the MapReducePipeline has_finalized
method is used to check for a completed job.