EMR是Amazon亚马逊云计算平台提供的一项服务,用户可以在此平台使用亚马逊强大的计算资源执行Map Reduce程序。由于Map Reduce很多情况下都是做的海量数据文本统计类的并行计算任务,需要耗费很多时间,使用云计算则可以大大加快执行速度。

EMR建立在Amazon S3Amazon EC2的基础上。用户提交一个map程序和一个reduce程序,同时提交需要处理的数据文件作为输入。这些都上传到Amazon S3云端存储平台,在EMR中指定相应的S3路径,就可以开始做数据处理了。EMR会根据用户指定的规模配置,开启一个EC2集群,在每个节点上运行Hadoop。运行结束后用户可以从S3获取结果数据。下面以word count单词统计任务为例,介绍具体操作过程。

1. 登录Amazon AWS控制台,选择新建任务(点击查看大图)

mapreduce1

2. 在运行Map Reduce任务前,我们需要先将Map和Reduce程序,以及待分析的输入文件上传到S3中

mapreduce2

3. 回到新建任务上来,输入任务信息

mapreduce3

4. 程序位置、输入文件位置等信息,这些文件都预先上传到了S3里

mapreduce4

Mapper和Reducer程序,请参考Word Count。本博客的MapReduce分类下的一些其他python示例,也可以使用。

5. 配置EC2实例相关参数

mapreduce5

6. 此步可跳过

mapreduce6

7. 所有任务信息汇总确认

mapreduce7

8. 监控Map Reduce任务运行状态

mapreduce8

9. 任务执行完毕

mapreduce9

10. 从S3中查看或下载输出结果

mapreduce10

Amazon的云计算平台提供了很多API,使得开发者能根据自己的需要与云端交互。boto(Amazon官方介绍)就是一个python API,作为一个脚本语言,使用python可以以简短的代码写出原型程序,快速实现或检验自己的想法。下面是使用boto与Amazon EMR交互的例子。

#!/usr/bin/python
#
#  Amazon EMR Interface
#  A program to run and monitor streaming job flow on Amazon EMR
#  Author: Zeng, Xi
#  SID:    1010105140
#  Email:  [email protected]
import time
connected = 0
jobid =0
 
def connect():
    access_key = raw_input('Your access key:').strip()
    secret_key = raw_input('Your secret key:').strip()
    from boto.emr.connection import EmrConnection
    global conn
    conn = EmrConnection(access_key, secret_key)
    global connected
    connected = 1
 
def run():
    if connected == 0:
        print 'Not connected!'
    elif connected == 1:
        s_mapper = raw_input('Path of the mapper:').strip()
        s_reducer = raw_input('Path of the reducer:').strip()
        s_input = raw_input('Path of input files:').strip()
        s_output = raw_input('Path for storing output files:').strip()
        from boto.emr.step import StreamingStep
        step = StreamingStep(name='My steps', mapper=s_mapper, reducer=s_reducer, input=s_input, output=s_output)
 
        jf_name = raw_input('Name of job flow:').strip()
        jf_log = raw_input('Path of logs:').strip()
        instance_type = raw_input('Type of instance:').strip()
        instance_num = raw_input('Number of instance:').strip()
        global jobid
        jobid = conn.run_jobflow(name=jf_name,log_uri=jf_log, steps=[step], slave_instance_type=instance_type,num_instances=int(instance_num))
        print 'jobid = '+ jobid
        while True:
            time.sleep(5)
            status = conn.describe_jobflow(jobid)
            print status.state
            if 'ENDED' in status.state:
                break
 
def showMenu():
    title = '''
        Amazon EMR Service
 
    connect        Connect to Amazon EMR
    run        Input job flow info and run on Amazon EMR
    quit        Quit
 
Enter choice:'''
    while True:
        choice = raw_input(title).strip().lower()
        choices =  ['connect','run','quit']
        if choice not in choices:
            print('Input Error!')
        else:
            if choice == 'quit':
                break
            elif choice == 'connect':
                connect()
            elif choice == 'run':
                run()
if __name__ == '__main__':
    showMenu()