前面三篇文章介绍了S3以及如何使用Hadoop和Spark处理S3上的文本文件,但毕竟我们使用S3的目的是为了处理非结构化文件(图片,视频)。本文介绍了如何使用Spark处理存储在S3某一个bucket里大量文件(实际我只放了很少几张图片)的方法。由于我不了解图片处理算法,所以图片的处理只是简单读取了该图片的长度、宽度、拍摄时间、拍摄地点等信息。至于更复杂的图像处理、视频处理,如车牌识别,如果图像处理算法是纯JAVA的实现,那只是代码复杂度的提升;如果算法是C++的实现,需要将C++嵌入到JVM中,本文不涉及。

实际上编码比较简单,代码量也很小,但由于我不太熟悉Spark App,代码写的不太顺利,另外JavaSparkContext.binaryFiles使用的人很少,故此在这里记录下。

主流程

  private static void wordCount(String[] args) {
    if (args.length < 1) {
        System.err.println("Usage: JavaWordCount <file>");
        System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);

    ctx.hadoopConfiguration().set("fs.s3a.access.key", "yourAccess");
    ctx.hadoopConfiguration().set("fs.s3a.secret.key", "yourKey");
    ctx.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false");
    ctx.hadoopConfiguration().set("fs.s3a.endpoint", "1.2.3.4:80");

    JavaPairRDD<String, PortableDataStream> myRdd = ctx.binaryFiles(args[0]);
    List <Tuple2<String, String>> output = myRdd.map(new PicProcess()).collect();
    for (Tuple2<?,?> tuple : output) {
        System.out.println(tuple._1() + ": " + tuple._2());
    }

    ctx.stop();
  }
  1. 使用binaryFiles(显然)。这个方法要求参数是一个路径,而非文件,类似于wholeTextFiles而非textFile。其返回值为JavaPairRDD,RDD的(K,V)
  2. spark-submit时,最后填写的参数是s3路径,如: s3a://zzz/
  3. JavaPairRDD.map要求一个function,看名字是一个函数/方法,但实际要求的是实现org.apache.spark.api.java.function.Function接口的对象。在这里我给的是一个PicProcess类,代码贴在下面。
  4. map后得到的是JavaRDD,我的处理比较简单,collect得到list后打印出来。

PicProcess类

PicProcess实现了org.apache.spark.api.java.function.Function接口,主要是其中的call方法。

public class PicProcess
        implements org.apache.spark.api.java.function.Function<Tuple2<String, PortableDataStream>, Tuple2<String, String>> {

    public Tuple2<String, String> call(Tuple2<String, PortableDataStream> spTuple2) throws Exception {
        DataInputStream disFile = spTuple2._2().open();

        ImgInfoBean imgInfoBean = null;
        try {
            Metadata metadata = ImageMetadataReader.readMetadata(disFile);
            imgInfoBean = printImageTags(metadata);
        } catch (ImageProcessingException e) {

            System.err.println("ERR IMAGE PROCESS: " + e);
        } catch (IOException e) {
            System.err.println("ERR IO: " + e);
        }

        return new Tuple2<String, String>(spTuple2._1(), imgInfoBean.toString());
    }
}

解释下。

org.apache.spark.api.java.function.Function接口只接收2个参数,一个入一个出,但JavaPairRDD入参是2个,所以需要合并组成一个scala.Tuple2;输出时我也希望是K,V的形式(文件名,文件信息),所以也合并组织成一个scala.Tuple2。

call的结构参考下代码,比较简单。

入参的KV中,V的类型是PortaleDataStream,它的open方法返回DataInputStream,可以直接丢给ImageMetadataReader.readMetadata读取图片的meta信息。

图片处理

    private ImgInfoBean printImageTags(Metadata metadata)
    {
        ImgInfoBean imgInfoBean = new ImgInfoBean ();
        for (Directory directory : metadata.getDirectories()) {
            for (Tag tag : directory.getTags()) {
                String tagName = tag.getTagName();
                String desc = tag.getDescription();
                if (tagName.equals("Image Height")) {
                    //图片高度
                    imgInfoBean.setImgHeight(desc);
                } else if (tagName.equals("Image Width")) {
...
            }
            for (String error : directory.getErrors()){
                System.err.println("ERROR: " + error);
            }
        }
        return imgInfoBean;
    }

    private String pointToLatlong (String point ) {
        Double du = Double.parseDouble(point.substring(0, point.indexOf("°")).trim());
        Double fen = Double.parseDouble(point.substring(point.indexOf("°")+1, point.indexOf("'")).trim());
        Double miao = Double.parseDouble(point.substring(point.indexOf("'")+1, point.indexOf("\"")).trim());
        Double duStr = du + fen / 60 + miao / 60 / 60 ;
        return duStr.toString();
    }

这段代码是从open-open那抄的,基本没改。ImgInfoBean就不贴了,直接看链接就好了。

执行结果

提交到YARN上去:

./bin/spark-submit --principal ieevee/zelda1@ZELDA.COM --keytab /etc/security/ieevee.zelda1.keytab --driver-java-options '-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=13838'  --jars /home/ieevee/hadoop/hadoop-2.7.2/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar,/home/ieevee/hadoop/hadoop-2.7.2/share/hadoop/tools/lib/hadoop-aws-2.7.2.jar  --class com.ieevee.dthink.ads.zkcfg.HiveJdbcClient --master yarn /home/ieevee/zkcfg-1.0-SNAPSHOT.jar s3a://zzz/

如上,最后一个参数就是我们要处理的S3 bucket。

结果摘录。看上去,虽然s3的bucket里只有2个图片,spark还是分了2个container来执行。Spark的这段调度后面可以看看,如是否很多每个文件一个task。

16/08/05 19:55:17 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (zelda1:44604) with ID 1
16/08/05 19:55:17 INFO BlockManagerMasterEndpoint: Registering block manager zelda1:44854 with 21.3 GB RAM, BlockManagerId(1, zelda1, 44854)
16/08/05 19:55:17 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (zelda2:33312) with ID 2
16/08/05 19:55:17 INFO BlockManagerMasterEndpoint: Registering block manager zelda2:44510 with 21.3 GB RAM, BlockManagerId(2, zelda2, 44510)
...
16/08/05 19:55:21 INFO DAGScheduler: Job 0 finished: collect at HiveJdbcClient.java:258, took 2.675374 s
s3a://zzz/aaa.jpg: [Picture Information]: high:4208 pixels width:3120 pixels  recording time:2015:03:07 23:07:15  altitude:0 metres latitude:30.14270833333333 longitude:120.18971944444445
s3a://zzz/bbb.jpg: [Picture Information]: high:4208 pixels width:3120 pixels  recording time:2015:03:07 23:07:16  altitude:0 metres latitude:30.14270833333333 longitude:120.18971944444445

来轻松下,比对下其中一张图片的GPS信息:

bbb.jpg


传送门:

Spark支持S3作为DataSource(一):S3及其开源实现

Spark支持S3作为DataSource(二):Hadoop集成S3 Service

Spark支持S3作为DataSource(三):Spark集成S3 Service

Spark支持S3作为DataSource(四):使用Spark处理存储在S3上的图片文件