VehPicturePicService.scala 3.68 KB
Newer Older
shuyulong committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
package com.hikcreate.picservice

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Properties, UUID}

import com.hikcreate.picservice.utils.ParameterUtil
import com.hikcreate.utils.FastdfsUtils
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
 * @author yulong shu
 * @date 2021/8/16 11:31
 * @version 1.0
 */
object VehPicturePicService {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
    doTask(sparkSession,args)
  }
  protected def doTask(sparkSession: SparkSession,args:Array[String]): Unit = {
    val map: mutable.Map[String, String] = ParameterUtil.fromArgs(args)
    val outputTable=map.getOrElse("outputTable","ods_jg_yp.ods_veh_picture")
    val url = map.getOrElse("url", "jdbc:oracle:thin:@172.16.17.82:1521:gyjg")
    val dbuser = map.getOrElse("user", "gyjg")
    val dbpassword = map.getOrElse("password", "gyjg2018")
    val inputTable = map.getOrElse("inputTable", "veh_picture")

    val properties = new Properties()
    properties.put("driver", "oracle.jdbc.driver.OracleDriver")
    properties.put("user", dbuser)
    properties.put("password", dbpassword)
    //并发读取oracle数据
    val arr = ArrayBuffer[Int]()
    for (i <- 0 until (10)) {
      arr.append(i)
    }
    val predicates = arr.map(i => {
      s" REGEXP_SUBSTR(REVERSE(XH),'[0-9]') = $i "
    }).toArray
    val vehDF: DataFrame = sparkSession.read.jdbc(url, inputTable, predicates, properties)
    import sparkSession.implicits._
   val resultDF : DataFrame=vehDF.mapPartitions(
      iter => {
        val fdfs = new FastdfsUtils()
        fdfs.init()
        val picItr: Iterator[VehPicture] = iter.map(
          data => {
            val xh: String = data.getAs[String]("XH")
            val hpzl: String = data.getAs[String]("HPZL")
            val hphm: String = data.getAs[String]("HPHM")
            var zpUrl = ""
            val zp: Array[Byte] = data.getAs[Array[Byte]]("ZP")
            if (zp != null && zp.size > 10) {
             // zpUrl = pic.upload(UUID.randomUUID() + ".jpg", zp, photoHost, photoPort.toInt, photoUser, photoPassword, photoBasePath, photoFilepath + "_" + today, nginxUrl)
             zpUrl= fdfs.uploadFile(zp, "jpeg")
            }
            val gxsj: Timestamp = data.getAs[Timestamp]("GXSJ")
            var gxsjStr = ""
            if (gxsj != null) {
              gxsjStr = gxsj.toString
            }
            VehPicture(xh, hpzl, hphm, zpUrl, gxsjStr)
          }
        )
        picItr
      }
    ).toDF()
     resultDF.createOrReplaceTempView("veh_picture")

    //当前处理图片数据后,删除fdfs上历史数据
    if(resultDF.take(5)!=null && resultDF.take(5).size>0){
      val photoDF: DataFrame = sparkSession.sql(
        s"""
           |select
           |zp
           |from
           |${outputTable}
           |""".stripMargin)
      if(photoDF.take(10)!=null && photoDF.take(10).size>0){
        photoDF.foreachPartition(
          iter=>{
            val fdfs = new FastdfsUtils
            fdfs.init()
            iter.map(
              data=>{
                val zp=data.getAs[String]("zp")
                fdfs.deleteFIle(zp)
              }
            )
            fdfs.close()
          }
        )
      }
    }

    sparkSession.sql(s"insert overwrite table ${outputTable} select * from veh_picture")
    /*resultDF
      .write
      .format("hive")
      .option("spark.sql.catalogImplementation","hive")
      .mode(SaveMode.Overwrite)
      .saveAsTable(outputTable)*/
  }
}

case class VehPicture(xh:String,hpzl:String,hphm:String,zp:String,gxsj:String)