xenron 发表于 2018-8-27 06:55

使用Spark批量上传图片到HBase中并同时使用OpenCV提取SIFT特征值



最近正在学习利用Spark做图像的分类和检索实验,首先需要上传图像数据(保存在本地文件系统中)到HBase中,提取的图像特征是SIFT,借助OpenCV库提取,刚开始是写一个任务上传图片,然后再写一个任务提取HBase中图像的特征值,考虑到图片的序列化和反序列化会耗费大量的时间,且频繁的磁盘IO对时间消耗也很大,因此,将两个任务合并成一个任务处理,减少处理时间,同时实验过程中也遇到不少错误。批量上传数据并提取SIFT特征

package com.fang.spark

import java.awt.image.{BufferedImage, DataBufferByte}
import java.io.ByteArrayInputStream
import javax.imageio.ImageIO
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.opencv.core.{Core, CvType, Mat, MatOfKeyPoint}
import org.opencv.features2d.{DescriptorExtractor, FeatureDetector}

/**
* Created by hadoop on 16-11-15.
* 批量上传数据到HBase表中,并计算sift值
* 表结构为:
* imagesTable:(image:imageBinary,sift)
* RowKey设计为:
*/

object HBaseUpLoadImages {
def main(args: Array): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("HBaseUpLoadImages").
      setMaster("local").
      set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkContext = new SparkContext(sparkConf)
    //TODO 单机测试情况下,图片文件太多,程序运行失败,打出hs_err_pid***_log日志,具体情况不明
    val imagesRDD = sparkContext.binaryFiles("/home/fang/images/train/test")
    System.loadLibrary(Core.NATIVE_LIBRARY_NAME)
    // val imagesRDD = sparkContext.newAPIHadoopFile("/home/fang/images/train/1")
    // val columnFaminlys :Array = Array("image")
    //createTable(tableName,columnFaminlys,connection)
    imagesRDD.foreachPartition {
      iter => {
      val hbaseConfig = HBaseConfiguration.create()
      hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181")
      hbaseConfig.set("hbase.zookeeper.quorum", "fang-ubuntu,fei-ubuntu,kun-ubuntu")
      val connection: Connection = ConnectionFactory.createConnection(hbaseConfig);
      val tableName = "imagesTest"
      val table: Table = connection.getTable(TableName.valueOf(tableName))
      iter.foreach {
          imageFile => {
            //val bi: BufferedImage = ImageIO.read(new ByteArrayInputStream(imageFile._2.toArray()))
            //println(bi.getColorModel)
            val tempPath = imageFile._1.split("/")
            val len = tempPath.length
            val imageName = tempPath(len - 1)
            //TODO 尝试直接获取BufferedImage数据,提升效率
            val imageBinary: scala.Array = imageFile._2.toArray()
            val put: Put = new Put(Bytes.toBytes(imageName))
            put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("binary"), imageBinary)
            put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("path"), Bytes.toBytes(imageFile._1))
            val sift = getImageSift(imageBinary)
            if(!sift.isEmpty) {
            put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("sift"),sift.get)
            }
            table.put(put)
          }
      }
      connection.close()
      }
    }

    sparkContext.stop()
}

//获取图像的sift特征
def getImageSift(image: Array):Option] = {
    val bi: BufferedImage = ImageIO.read(new ByteArrayInputStream(image))
    val test_mat = new Mat(bi.getHeight, bi.getWidth, CvType.CV_8U)
    //java.lang.UnsupportedOperationException:
    // Provided data element number (188000) should be multiple of the Mat channels count (3)
    //更改CvType.CV_8UC3为CvType.CV_8U,解决上面错误
    // val test_mat = new Mat(bi.getHeight, bi.getWidth, CvType.CV_8UC3)
    val data = bi.getRaster.getDataBuffer.asInstanceOf.getData
    // if(bi.getColorModel.getNumComponents==3) {
    test_mat.put(0, 0, data)
    val desc = new Mat
    val fd = FeatureDetector.create(FeatureDetector.SIFT)
    val mkp = new MatOfKeyPoint
    fd.detect(test_mat, mkp)
    val de = DescriptorExtractor.create(DescriptorExtractor.SIFT)
    //提取sift特征
    de.compute(test_mat, mkp, desc)
    //判断是否有特征值
    if(desc.rows()!=0) {
      Some(Utils.serializeMat(desc))
    }else{
      None
    }
}

}

需要序列化提取的Mat类型特征值
public class Utils{
public static byte[] serializeMat(Mat mat) {
      ByteArrayOutputStream bos = new ByteArrayOutputStream();
      try {
            float[] data = new float[(int) mat.total() * mat.channels()];
            mat.get(0, 0, data);
            ObjectOutput out = new ObjectOutputStream(bos);
            out.writeObject(data);
            out.close();
            // Get the bytes of the serialized object
            byte[] buf = bos.toByteArray();
            return buf;
      } catch (IOException ioe) {
            ioe.printStackTrace();
            return null;
      }
    }
}

遇到的错误及解决
[*]java.lang.UnsupportedOperationException:Provided data element number (188000) should be multiple of the Mat channels count (3)
更改CvType.CV_8UC3为CvType.CV_8U,解决上面错误
[*]not serialization
没有使用foreachPartition出现下面的错误
org.apache.spark.SparkException: Task not serializable
[*]UnsatisfiedLinkError: Native method not found: org.opencv.core.Mat.n_Mat:()J
忘记加载或加载失败(System.loadLibrary(Core.NATIVE_LIBRARY_NAME))
[*]java.lang.UnsupportedOperationException: Mat data type is not compatible: 0
mat类型数据序列化时没有判断mat是否有值
运行的过程中需要加载opencv lib包
-Djava.library.path=/home/fang/BigDataSoft/opencv-2.4.13/release/lib
当上传过多的图像时程序会运行失败,初步推断是内存溢出,下一步解决问题

页: [1]
查看完整版本: 使用Spark批量上传图片到HBase中并同时使用OpenCV提取SIFT特征值