EMR是Amazon亞馬遜雲計算平台提供的一項服務,用戶可以在此平台使用亞馬遜強大的計算資源執行Map Reduce程序。由於Map Reduce很多情況下都是做的海量數據文本統計類的並行計算任務,需要耗費很多時間,使用雲計算則可以大大加快執行速度。

EMR建立在Amazon S3Amazon EC2的基礎上。用戶提交一個map程序和一個reduce程序,同時提交需要處理的數據文件作為輸入。這些都上傳到Amazon S3雲端存儲平台,在EMR中指定相應的S3路徑,就可以開始做數據處理了。EMR會根據用戶指定的規模配置,開啟一個EC2集群,在每個節點上運行Hadoop。運行結束後用戶可以從S3獲取結果數據。下面以word count單詞統計任務為例,介紹具體操作過程。

1. 登錄Amazon AWS控制台,選擇新建任務(點擊查看大圖)

mapreduce1

2. 在運行Map Reduce任務前,我們需要先將Map和Reduce程序,以及待分析的輸入文件上傳到S3中

mapreduce2

3. 回到新建任務上來,輸入任務信息

mapreduce3

4. 程序位置、輸入文件位置等信息,這些文件都預先上傳到了S3里

mapreduce4

Mapper和Reducer程序,請參考Word Count。本博客的MapReduce分類下的一些其他python示例,也可以使用。

5. 配置EC2實例相關參數

mapreduce5

6. 此步可跳過

mapreduce6

7. 所有任務信息匯總確認

mapreduce7

8. 監控Map Reduce任務運行狀態

mapreduce8

9. 任務執行完畢

mapreduce9

10. 從S3中查看或下載輸出結果

mapreduce10

Amazon的雲計算平台提供了很多API,使得開發者能根據自己的需要與雲端交互。boto(Amazon官方介紹)就是一個python API,作為一個腳本語言,使用python可以以簡短的代碼寫出原型程序,快速實現或檢驗自己的想法。下面是使用boto與Amazon EMR交互的例子。

#!/usr/bin/python
#
#  Amazon EMR Interface
#  A program to run and monitor streaming job flow on Amazon EMR
#  Author: Zeng, Xi
#  SID:    1010105140
#  Email:  [email protected]
import time
connected = 0
jobid =0
 
def connect():
    access_key = raw_input('Your access key:').strip()
    secret_key = raw_input('Your secret key:').strip()
    from boto.emr.connection import EmrConnection
    global conn
    conn = EmrConnection(access_key, secret_key)
    global connected
    connected = 1
 
def run():
    if connected == 0:
        print 'Not connected!'
    elif connected == 1:
        s_mapper = raw_input('Path of the mapper:').strip()
        s_reducer = raw_input('Path of the reducer:').strip()
        s_input = raw_input('Path of input files:').strip()
        s_output = raw_input('Path for storing output files:').strip()
        from boto.emr.step import StreamingStep
        step = StreamingStep(name='My steps', mapper=s_mapper, reducer=s_reducer, input=s_input, output=s_output)
 
        jf_name = raw_input('Name of job flow:').strip()
        jf_log = raw_input('Path of logs:').strip()
        instance_type = raw_input('Type of instance:').strip()
        instance_num = raw_input('Number of instance:').strip()
        global jobid
        jobid = conn.run_jobflow(name=jf_name,log_uri=jf_log, steps=[step], slave_instance_type=instance_type,num_instances=int(instance_num))
        print 'jobid = '+ jobid
        while True:
            time.sleep(5)
            status = conn.describe_jobflow(jobid)
            print status.state
            if 'ENDED' in status.state:
                break
 
def showMenu():
    title = '''
        Amazon EMR Service
 
    connect        Connect to Amazon EMR
    run        Input job flow info and run on Amazon EMR
    quit        Quit
 
Enter choice:'''
    while True:
        choice = raw_input(title).strip().lower()
        choices =  ['connect','run','quit']
        if choice not in choices:
            print('Input Error!')
        else:
            if choice == 'quit':
                break
            elif choice == 'connect':
                connect()
            elif choice == 'run':
                run()
if __name__ == '__main__':
    showMenu()