Hadoop与S3的集成可以参考官方说明。我在这里被s3/s3n/s3a给搞混淆了,实际上对于AWS来说,只有s3协议,但是hadoop由于历史原因,逐步实现了s3->s3n->s3a三种访问S3服务的协议,支持的文件也越来越大。由于我们使用的是自建S3 Service,而只有s3a支持设置endpoint,虽然号称“仍有bug”,但也只能硬着头皮上了。

Hadoop配置S3集成

修改core-site.xml,增加如下配置:

<property>
  <name>fs.s3a.access.key</name>
  <value>xxxxxx</value>
  <description>AWS access key ID. Omit for Role-based authentication.</description>
</property>

<property>
  <name>fs.s3a.secret.key</name>
  <value>yyyyy</value>
  <description>AWS secret key</description>
</property>

<property>
  <name>fs.s3a.endpoint</name>
  <value>192.168.103.224:80</value>
  <description>AWS S3 endpoint to connect to. An up-to-date list is
    provided in the AWS Documentation: regions and endpoints. Without this
    property, the standard region (s3.amazonaws.com) is assumed.
  </description>
</property>

<property>
  <name>fs.s3a.connection.ssl.enabled</name>
  <value>false</value>
  <description>Enables or disables SSL connections to S3.</description>
</property>

分别说明下配置项的功能,这些选项在org.apache.hadoop.fs.s3a.Constants中有定义,后面spark集成S3时也会用到。

  • fs.s3a.access.key/fs.s3a.secret.key,即AK
  • fs.s3a.endpoint,即S3服务地址端口号,注意填写IP,不要用FQDN或者hostname,否则会报Caused by: java.net.UnknownHostException: xxx.zelda2。使用主机名的形式,会造成按AWS的方式来访问,即bucketname.awsxxx.com这种。
  • fs.s3a.connection.ssl.enabled,是否开启ssl,我们的S3服务使用http,需要填写为否。

配置完成后记得重启HDFS/YARN。

试一下文件上传下载好不好使:

./bin/hadoop fs -ls s3a://xxx/diamonds.csv
-rw-rw-rw-   1    2772075 2016-07-26 09:59 s3a://xxx/diamonds.csv
./bin/hadoop distcp /mapred s3a://xxx/
./bin/hadoop fs -ls s3a://xxx/
Found 2 items
-rw-rw-rw-   1    2772075 2016-07-26 09:59 s3a://xxx/diamonds.csv
-rw-rw-rw-   1       5953 2016-07-26 09:55 s3a://xxx/mapred
./bin/hadoop fs -get s3a://xxx/mapred

注意fs -ls的时候,bucketname后面有个斜杠,不加会报错:ls: `s3a://xxx’: No such file or directory。 distcp可以把HDFS的文件拷贝到s3上去,当然更方便的是直接使用fs -put。

再跑个MR:

$ ./bin/hadoop jar /home/ieevee/hadoop/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount  s3a://xxx/diamonds.csv /xxx2
16/07/27 09:59:05 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
..
16/07/27 09:59:06 INFO mapred.MapTask: Processing split: s3a://xxx/diamonds.csv:0+2772075
16/07/27 09:59:06 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/07/27 09:59:06 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
..

Signature V2/V4

OK,现在hadoop可以集成S3 Service,不过发现一个悲剧的事情,s3cmd不能读取这个bucket了。

ieevee@zelda1:~$ s3cmd ls
2016-07-25 10:12  s3://xxx
ieevee@zelda1:~$ s3cmd ls s3://xxx
ERROR: S3 error: 403 (SignatureDoesNotMatch)

看上去是我在.s3cfg里填写的AK错误,但并没有填错。把s3cmd的debug打开,可以看到:

$ s3cmd -d ls s3://xxx
DEBUG: s3cmd version 1.6.1
DEBUG: ConfigParser: Reading file '/home/ieevee/.s3cfg'
..
DEBUG: CreateRequest: resource[uri]=/
DEBUG: Using signature v4

原来s3cmd默认用的是v4的signature,而hadoop都还是v2(别问我怎么知道的,直觉。。),加上参数即可:

$ s3cmd --signature-v2 ls s3://xxx
2016-07-26 01:59   2772075   s3://xxx/diamonds.csv
2016-07-26 01:55      5953   s3://xxx/mapred

!! 这是一个困扰了我很久的问题:Signature版本。

在调试Spark集成S3 Service的时候,我用过一段时间的minio来提供S3 Service,我的bucket、object都是通过minio Browser创建的,但是Spark在操作s3的时候,总是会报错Bad Request(Exception in thread “main” com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: null),但我自己在windows本地写的操作S3的代码(如下),类似的逻辑,并没有报错。

        String accessKey = "xxxx";
        String secretKey = "yyyy/zzzz";

        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);

        ClientConfiguration clientConfig = new ClientConfiguration();
        clientConfig.setProtocol(Protocol.HTTP);

        AmazonS3Client conn = new AmazonS3Client(credentials, clientConfig);
        conn.setEndpoint("192.168.103.228:9000");

        if (!conn.doesBucketExist("xxx")) {
            System.out.println("Bucket " + "xxx" + " does not exist");
        }

二者不同的地方是,我自己的工程maven依赖aws-java-sdk是1.11.19版本,而hadoop还是1.7.4。通过抓包对比,发现1.11.19版本正常交互的http请求报文如下:

HEAD /xxx/ HTTP/1.1..
Host: 192.168.103.228:9000..
x-amz-content-sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855..
Authorization: AWS4-HMAC-SHA256 Credential=E1S2J126W149HX250245/20160725/us-east-1/s3/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date, Signature=ed31685d71e03518b0f048798ca7d125246f155d6e5c9bfa9cf06524c5251002..
X-Amz-Date: 20160725T091527Z..
User-Agent: aws-sdk-java/1.11.19 Windows_7/6.1 Java_HotSpot(TM)_64-Bit_Server_VM/25.40-b25/1.8.0_40..
amz-sdk-invocation-id: 7ba4c0c3-f73a-a8cb-f6ba-f104e66688fc..
amz-sdk-retry: 0/0/500..
Content-Type: application/octet-stream..

而1.7.4版本bad request的http请求报文是:

HEAD /xxx/ HTTP/1.1....
Host: 192.168.103.228:9000..
Authorization: AWS 2F9ZA80YUF2EFIXT2HD7:3Nb0IOuR1MxDzGSbTmwq95ySc0s=....
User-Agent: aws-sdk-java/1.7.4 Windows_7/6.1 Java_HotSpot(TM)_64-Bit_Server_VM/25.40-b25/1.8.0_40....
Date: Mon, 25 Jul 2016 12:01:36 GMT....
Content-Type: application/x-www-form-urlencoded; charset=utf-8....
Connection: Keep-Alive....

可以看到1.11.19中Authorization是AWS4-XX,而1.7.4还是AWS。这样结论就比较清晰了,1.11.19使用的是Signature v4,而1.7.4使用的是Signature V2,而作为服务端的minio只支V4,故而抛错。V4是S3比较安全的signature。

而Ceph支持V2,使用1.7.4的aws-java-sdk去访问Ceph S3 Service是正常的。

回到前面的问题,原因是hadoop和s3cmd使用的aws-java-sdk版本不一样,导致Signature版本号不一致,要么升级hadoop要么强制指定s3cmd版本号。我试过升级hadoop使用的aws-java-sdk版本到1.11.19,编译OK,但是在Spark-shell执行时又引入了其他的依赖问题,不易解决,所以强制s3cmd使用V2是一个比较直接的解决办法。

..
16/07/27 09:59:08 INFO mapreduce.Job:  map 100% reduce 100%
16/07/27 09:59:08 INFO mapreduce.Job: Job job_local1744667392_0001 completed successfully
16/07/27 09:59:08 INFO mapreduce.Job: Counters: 40
	File System Counters
		FILE: Number of bytes read=6473080
		FILE: Number of bytes written=10019892
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=0
		HDFS: Number of bytes written=2747004
		HDFS: Number of read operations=7
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=4
		S3A: Number of bytes read=5544150
		S3A: Number of bytes written=0
		S3A: Number of read operations=4
		S3A: Number of large read operations=0
		S3A: Number of write operations=0
	Map-Reduce Framework
		Map input records=53940
		Map output records=66022
		Map output bytes=3036163
		Map output materialized bytes=2962944
		Input split bytes=87
		Combine input records=66022
		Combine output records=54025
		Reduce input groups=54025
		Reduce shuffle bytes=2962944
		Reduce input records=54025
		Reduce output records=54025
		Spilled Records=108050
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=22
		Total committed heap usage (bytes)=1029701632
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters
		Bytes Read=2772075
	File Output Format Counters
		Bytes Written=2747004


传送门:

Spark支持S3作为DataSource(一):S3及其开源实现

Spark支持S3作为DataSource(二):Hadoop集成S3 Service

Spark支持S3作为DataSource(三):Spark集成S3 Service

Spark支持S3作为DataSource(四):使用Spark处理存储在S3上的图片文件