Spark支持S3作为DataSource(二):Hadoop集成S3 Service
by 伊布
Hadoop与S3的集成可以参考官方说明。我在这里被s3/s3n/s3a给搞混淆了,实际上对于AWS来说,只有s3协议,但是hadoop由于历史原因,逐步实现了s3->s3n->s3a三种访问S3服务的协议,支持的文件也越来越大。由于我们使用的是自建S3 Service,而只有s3a支持设置endpoint,虽然号称“仍有bug”,但也只能硬着头皮上了。
Hadoop配置S3集成
修改core-site.xml,增加如下配置:
<property>
<name>fs.s3a.access.key</name>
<value>xxxxxx</value>
<description>AWS access key ID. Omit for Role-based authentication.</description>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>yyyyy</value>
<description>AWS secret key</description>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>192.168.103.224:80</value>
<description>AWS S3 endpoint to connect to. An up-to-date list is
provided in the AWS Documentation: regions and endpoints. Without this
property, the standard region (s3.amazonaws.com) is assumed.
</description>
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>false</value>
<description>Enables or disables SSL connections to S3.</description>
</property>
分别说明下配置项的功能,这些选项在org.apache.hadoop.fs.s3a.Constants中有定义,后面spark集成S3时也会用到。
- fs.s3a.access.key/fs.s3a.secret.key,即AK
- fs.s3a.endpoint,即S3服务地址端口号,注意填写IP,不要用FQDN或者hostname,否则会报
Caused by: java.net.UnknownHostException: xxx.zelda2
。使用主机名的形式,会造成按AWS的方式来访问,即bucketname.awsxxx.com这种。 - fs.s3a.connection.ssl.enabled,是否开启ssl,我们的S3服务使用http,需要填写为否。
配置完成后记得重启HDFS/YARN。
试一下文件上传下载好不好使:
./bin/hadoop fs -ls s3a://xxx/diamonds.csv
-rw-rw-rw- 1 2772075 2016-07-26 09:59 s3a://xxx/diamonds.csv
./bin/hadoop distcp /mapred s3a://xxx/
./bin/hadoop fs -ls s3a://xxx/
Found 2 items
-rw-rw-rw- 1 2772075 2016-07-26 09:59 s3a://xxx/diamonds.csv
-rw-rw-rw- 1 5953 2016-07-26 09:55 s3a://xxx/mapred
./bin/hadoop fs -get s3a://xxx/mapred
注意fs -ls的时候,bucketname后面有个斜杠,不加会报错:ls: `s3a://xxx’: No such file or directory。 distcp可以把HDFS的文件拷贝到s3上去,当然更方便的是直接使用fs -put。
再跑个MR:
$ ./bin/hadoop jar /home/ieevee/hadoop/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount s3a://xxx/diamonds.csv /xxx2
16/07/27 09:59:05 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
..
16/07/27 09:59:06 INFO mapred.MapTask: Processing split: s3a://xxx/diamonds.csv:0+2772075
16/07/27 09:59:06 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/07/27 09:59:06 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
..
Signature V2/V4
OK,现在hadoop可以集成S3 Service,不过发现一个悲剧的事情,s3cmd不能读取这个bucket了。
ieevee@zelda1:~$ s3cmd ls
2016-07-25 10:12 s3://xxx
ieevee@zelda1:~$ s3cmd ls s3://xxx
ERROR: S3 error: 403 (SignatureDoesNotMatch)
看上去是我在.s3cfg里填写的AK错误,但并没有填错。把s3cmd的debug打开,可以看到:
$ s3cmd -d ls s3://xxx
DEBUG: s3cmd version 1.6.1
DEBUG: ConfigParser: Reading file '/home/ieevee/.s3cfg'
..
DEBUG: CreateRequest: resource[uri]=/
DEBUG: Using signature v4
原来s3cmd默认用的是v4的signature,而hadoop都还是v2(别问我怎么知道的,直觉。。),加上参数即可:
$ s3cmd --signature-v2 ls s3://xxx
2016-07-26 01:59 2772075 s3://xxx/diamonds.csv
2016-07-26 01:55 5953 s3://xxx/mapred
!! 这是一个困扰了我很久的问题:Signature版本。
在调试Spark集成S3 Service的时候,我用过一段时间的minio来提供S3 Service,我的bucket、object都是通过minio Browser创建的,但是Spark在操作s3的时候,总是会报错Bad Request(Exception in thread “main” com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: null),但我自己在windows本地写的操作S3的代码(如下),类似的逻辑,并没有报错。
String accessKey = "xxxx";
String secretKey = "yyyy/zzzz";
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
ClientConfiguration clientConfig = new ClientConfiguration();
clientConfig.setProtocol(Protocol.HTTP);
AmazonS3Client conn = new AmazonS3Client(credentials, clientConfig);
conn.setEndpoint("192.168.103.228:9000");
if (!conn.doesBucketExist("xxx")) {
System.out.println("Bucket " + "xxx" + " does not exist");
}
二者不同的地方是,我自己的工程maven依赖aws-java-sdk是1.11.19版本,而hadoop还是1.7.4。通过抓包对比,发现1.11.19版本正常交互的http请求报文如下:
HEAD /xxx/ HTTP/1.1..
Host: 192.168.103.228:9000..
x-amz-content-sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855..
Authorization: AWS4-HMAC-SHA256 Credential=E1S2J126W149HX250245/20160725/us-east-1/s3/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date, Signature=ed31685d71e03518b0f048798ca7d125246f155d6e5c9bfa9cf06524c5251002..
X-Amz-Date: 20160725T091527Z..
User-Agent: aws-sdk-java/1.11.19 Windows_7/6.1 Java_HotSpot(TM)_64-Bit_Server_VM/25.40-b25/1.8.0_40..
amz-sdk-invocation-id: 7ba4c0c3-f73a-a8cb-f6ba-f104e66688fc..
amz-sdk-retry: 0/0/500..
Content-Type: application/octet-stream..
而1.7.4版本bad request的http请求报文是:
HEAD /xxx/ HTTP/1.1....
Host: 192.168.103.228:9000..
Authorization: AWS 2F9ZA80YUF2EFIXT2HD7:3Nb0IOuR1MxDzGSbTmwq95ySc0s=....
User-Agent: aws-sdk-java/1.7.4 Windows_7/6.1 Java_HotSpot(TM)_64-Bit_Server_VM/25.40-b25/1.8.0_40....
Date: Mon, 25 Jul 2016 12:01:36 GMT....
Content-Type: application/x-www-form-urlencoded; charset=utf-8....
Connection: Keep-Alive....
可以看到1.11.19中Authorization是AWS4-XX,而1.7.4还是AWS。这样结论就比较清晰了,1.11.19使用的是Signature v4,而1.7.4使用的是Signature V2,而作为服务端的minio只支V4,故而抛错。V4是S3比较安全的signature。
而Ceph支持V2,使用1.7.4的aws-java-sdk去访问Ceph S3 Service是正常的。
回到前面的问题,原因是hadoop和s3cmd使用的aws-java-sdk版本不一样,导致Signature版本号不一致,要么升级hadoop要么强制指定s3cmd版本号。我试过升级hadoop使用的aws-java-sdk版本到1.11.19,编译OK,但是在Spark-shell执行时又引入了其他的依赖问题,不易解决,所以强制s3cmd使用V2是一个比较直接的解决办法。
..
16/07/27 09:59:08 INFO mapreduce.Job: map 100% reduce 100%
16/07/27 09:59:08 INFO mapreduce.Job: Job job_local1744667392_0001 completed successfully
16/07/27 09:59:08 INFO mapreduce.Job: Counters: 40
File System Counters
FILE: Number of bytes read=6473080
FILE: Number of bytes written=10019892
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=0
HDFS: Number of bytes written=2747004
HDFS: Number of read operations=7
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
S3A: Number of bytes read=5544150
S3A: Number of bytes written=0
S3A: Number of read operations=4
S3A: Number of large read operations=0
S3A: Number of write operations=0
Map-Reduce Framework
Map input records=53940
Map output records=66022
Map output bytes=3036163
Map output materialized bytes=2962944
Input split bytes=87
Combine input records=66022
Combine output records=54025
Reduce input groups=54025
Reduce shuffle bytes=2962944
Reduce input records=54025
Reduce output records=54025
Spilled Records=108050
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=22
Total committed heap usage (bytes)=1029701632
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2772075
File Output Format Counters
Bytes Written=2747004
传送门:
Spark支持S3作为DataSource(一):S3及其开源实现
Spark支持S3作为DataSource(二):Hadoop集成S3 Service
Subscribe via RSS