Atom Login Admin

Seeds of mention

Getting started Google App Engine MapReduce

lakshmi@cloudysunny14.org
mentioned on Wednesday,January 23,2013

This time, Introduces Google App Engine MapReduce.
Google App Engine MapReduce is Created by applying the Pipeline API
that was introduced before,
Hadoop MapReduce is different and original,
But it has become a framework for distributed processing can be performed
on the Google App Engine.

Well, I will introduce in the demo application.
First, let's create the application according to the Getting Start.

So, It boing that same as demo application,
I tried with InputReader I created.
This is InputReader to read from the Google Cloud Storage.
First of all, you need to enable the Google Cloud Storage.
And I try to store a file to create a bucket.
The following has just store the files for the sample.

GCS Upload

Python code is follows:

  • main.py
def split_into_sentences(s):
  """Split text into list of sentences."""
  s = re.sub(r"\s+", " ", s)
  s = re.sub(r"[\\.\\?\\!]", "\n", s)
  return s.split("\n")

def split_into_words(s):
  """Split a sentence into list of words."""
  s = re.sub(r"\W+", " ", s)
  s = re.sub(r"[_0-9]+", " ", s)
  return s.split()

def word_count_map(data):
  """Word Count map function."""
  k, v = data
  for s in split_into_sentences(v):
    for w in split_into_words(s.lower()):
      yield (w, "")

FILE_NAMES = "/gs/gae_word_count/agrange.txt"

class WordCountPipeline(base_handler.PipelineBase):
  """A pipeline to run Word count demo.

  """
  def run(self):
    yield mapreduce_pipeline.MapreducePipeline(
        "word_count",
        "wordcount.word_count_map",
        "wordcount.word_count_reduce",
        "googlestorage.input_readers.GoogleStorageLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
          "file_paths": FILE_NAMES,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=4)

class Start(webapp.RequestHandler):
  def get(self):
    pipeline = WordCountPipeline() 
    pipeline.start()
    path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
    self.redirect(path)

application = webapp.WSGIApplication(
                  [("/start", Start)],
                  debug=True)
  • Run

Mapreduce Run

  • Result

Mapreduce Result00

Mapreduce Result01

I can easily experiment with distributed processing like this.
Then, the interesting coordination with other services offered by Google and Google App Engine.
I like to play around.:)

Comments

Add Comment

Login
This entry was tagged #GAE #MapReduce