package com.hikcreate.data.sync

import java.sql.Connection
import java.util.Locale

import com.alibaba.fastjson.{JSON, JSONObject}
import com.hikcreate.data.client.DbClient
import com.hikcreate.data.common.{Logging, Sparking}
import com.hikcreate.data.constant.Const
import com.hikcreate.data.model.TableKey
import com.hikcreate.data.util.{Tools, ZkManager}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.{DateTime, Duration}
import org.joda.time.format.DateTimeFormat

import scala.collection.mutable.ArrayBuffer

object SyncHive extends Sparking with Logging {



  def main(args: Array[String]): Unit = {
    val zkManager = ZkManager(Const.zkKafka)
    val kafkaParams = getKafkaParams(Const.bootstrap,Const.hiveGroupId)
    val offsets = zkManager.getBeginOffset(Const.applicationTopic,Const.hiveGroupId)
    val offsetRanges = new ArrayBuffer[OffsetRange]()
    val ssc = new StreamingContext(conf,Seconds(5))
    val inputStream = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](Const.applicationTopic,kafkaParams,offsets))
    inputStream.transform{ rdd =>
      offsetRanges.clear()
      offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges:_*)
      rdd
    }.map(x=>x.value()).foreachRDD{ rdd =>
      if(!rdd.isEmpty()){
        val startTime = DateTime.now()
        rdd.map(JSON.parseObject).groupBy(json=>TableKey(Option(json.getString("msgId")),Option(json.getString("dataType")))).foreachPartition(x=>processRow2(x))
        zkManager.saveEndOffset(offsetRanges,Const.hiveGroupId)
        offsetRanges.foreach{x=>
          println(x)
        }
//        println(offsetRanges(0))
        val endTime = DateTime.now()
        println(DateTime.now()+"==============time token: "+new Duration(startTime,endTime).getMillis+"ms==============")
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }

  def processRow2(x: Iterator[(TableKey, Iterable[JSONObject])]): Unit = {
    DbClient.init(Const.hivePoolName,Const.hiveDriver,Const.hiveUrl, Const.hiveUsername,Const.hivePassword)
    DbClient.usingDB(Const.hivePoolName) { db =>
      x.foreach{
        x=>try{
          val tableKey=x._1
          if(!Const.tableMap.contains(tableKey) && tableKey.msgId !=null) {//未知消息
            x._2.foreach{ json =>
              writeUnknown(db.conn,Const.unKnownTable,json)
            }
          }else if ( tableKey.msgId == null){//基础信息
            x._2.foreach{json=>
              writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json)
            }
          }else if (tableKey== TableKey(Some("0x1200"),Some("0x1203"))){//定位补报
            var jsonList=new ArrayBuffer[JSONObject]()
            val flat = x._2.flatMap(x=>Tools.addLocation(x))
            val value = flat.toList.grouped(20)
            value.foreach{sub=>
              val codes = Tools.getAddressAndLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000)))
              sub.zip(codes).foreach{line=>
                val json = line._1
                val location=line._2
                json.put("districtcode",location._2)
                jsonList.append(json)
                writeHiveTable(db.conn,Const.tableMap(tableKey),json)
              }
            }

          }else if (tableKey== TableKey(Some("0x1200"),Some("0x1202"))){//定位消息
            val value = x._2.toList.grouped(20)
            value.foreach{ sub=>
              val codes = Tools.getAddressAndLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000)))
              sub.zip(codes).foreach{line=>
                val json = line._1
                val location=line._2
                json.put("districtcode",location._2)
                writeHiveTable(db.conn,Const.tableMap(tableKey),json)
              }
            }
          }else if (tableKey== TableKey(Some("0x1400"),Some("0x1402"))){//报警上传
            var useFul= new ArrayBuffer[JSONObject]()
            x._2.foreach{json=>
              val warnType=json.getString("warnType")
              if (Const.warnTypes.contains(warnType)){
                useFul.append(json)
              } else{
                writeUnknown(db.conn,Const.unKnownTable,json)
              }
           }
            val value = useFul.filter(x=>Const.warnTypes.contains(x.getString("warnType"))).toList.grouped(20)
            value.foreach{ sub=>
              val codes={
                Tools.getAddressAndLocationCodes(sub.map{t=>
                  val infoStr = t.getString("infoContent")
                  val infoJson = Tools.getInfoContentJsonobj(infoStr)
                  (infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
                  }
                )
              }
              sub.zip(codes).foreach{line=>
                val json = line._1
                val location = line._2
                val infoStr = json.getString("infoContent")
                val infoJson = Tools.getInfoContentJsonobj(infoStr)
                val longitude = infoJson.get("LONGITUDE")
                val latitude = infoJson.get("LATITUDE")
                val eventtype = infoJson.get("EVENT_TYPE")
                json.put("longitude",infoJson.get("LONGITUDE"))
                json.put("latitude",infoJson.get("LATITUDE"))
                json.put("eventtype",infoJson.get("EVENT_TYPE"))
                json.put("fulladdress",location._1)
                json.put("districtcode",location._2)
                writeHiveTable(db.conn,Const.tableMap(tableKey),json)
              }
            }
          }else{//除了以上几种情况外的消息
            x._2.foreach{json=>
              writeHiveTable(db.conn,Const.tableMap(tableKey),json)
            }
          }
        }catch {
          case e:Exception=>
            println("发生插入错误的消息"+x._2.toString())
            e.printStackTrace()
        }
      }
    }
  }

  def processRow(iterator:Iterator[String]): Unit = {
    DbClient.init(Const.hivePoolName,Const.hiveDriver,Const.hiveUrl, Const.hiveUsername,Const.hivePassword)
    DbClient.usingDB(Const.hivePoolName){ db =>
      iterator.foreach{ x=>
        try {
          val json = JSON.parseObject(x)
          val tableKey = TableKey(Option(json.getString("msgId")),Option(json.getString("dataType")))
          if(!Const.tableMap.contains(tableKey)) {
            writeUnknown(db.conn,Const.unKnownTable,json)
          }else if ( !json.containsKey("msgId")){
            writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json)
          }else if (tableKey== TableKey(Some("0x1200"),Some("0x1203"))){
            Tools.addLocation(json).foreach(x=>writeHiveTable(db.conn,Const.tableMap(tableKey),x))
          }else{
            writeHiveTable(db.conn,Const.tableMap(tableKey),json)
          }
        }catch {
          case e:Exception =>
            error(x)
            e.printStackTrace()
        }
      }
    }
  }

  def writeBaseInfoHive(conn:Connection,tableName:String,json:JSONObject): Unit = {
    val keys = json.keySet().toArray(Array[String]())
    val insetSql =
      s"""
         |insert into ods.$tableName
         |(${keys.map(x=>x.toLowerCase()).mkString(",")})
         |values (${keys.map(_ => "?").mkString(",")})
       """.stripMargin
    val stmt = conn.prepareStatement(insetSql)
    (1 to keys.length).foreach{ index => stmt.setObject(index,json.get(keys(index-1)))}
    try{
      stmt.execute()
      info(s"insert date to HiveTable $tableName SUCCESS")
    }catch {
      case e:Exception=>{
        println("Exception Messages==>"+e)
        println(s"hive table $tableName insert data failed==>"+json)
      }
    }finally {
      stmt.close()
    }
  }

  def writeUnknown(conn:Connection,tableName:String,json:JSONObject): Unit = {
    val dateTime = new DateTime().toString("yyyy-MM-dd HH:mm:ss")
    val day = new DateTime().toString("yyyy-MM-dd")
    val jsonStr=json.toJSONString
    val insertSql = s"insert into ods.$tableName partition( day = '$day' ) (datetime,jsondata) values('$dateTime','$jsonStr')"
    val insertStmt = conn.prepareStatement(insertSql)
    try{
      insertStmt.execute()
      info(s"insert UnknownData date to HiveTable $tableName SUCCESS")
    } finally {
      insertStmt.close()
    }
  }

  def writeHiveTable(conn:Connection,tableName:String,json:JSONObject): Unit = {
    //定位消息,增加区县代码字段及值
//    if( tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1202")))
//      || tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1203")))){
//      val addressAndLocation = Tools.getAddressAndLocationCode(json.getDouble("lon")/1000000,json.getDouble("lat")/1000000)
//      json.put("districtcode",addressAndLocation._2)
//    }
    //报警信息 增加区县代码和事件类型,经纬度,详细地址
//    if(tableName ==  Const.tableMap(TableKey(Some("0x1400"),Some("0x1402")))){
//      val warnType = json.getString("warnType")
//      if (!Const.warnTypes.contains(warnType)){
//        writeUnknown(conn,Const.unKnownTable,json)
//      }
//      val infoStr = json.getString("infoContent")
//      val infoJson = Tools.getInfoContentJsonobj(infoStr)
//      json.put("longitude",infoJson.get("LONGITUDE"))
//      json.put("latitude",infoJson.get("LATITUDE"))
//      json.put("eventtype",infoJson.get("EVENT_TYPE"))
//      val addressAndLocation = Tools.getAddressAndLocationCode(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
//      json.put("fulladdress",addressAndLocation._1)
//      println(addressAndLocation._1)
//      json.put("districtcode",addressAndLocation._2)
//    }
    val keys = json.keySet().toArray(Array[String]())
    val day = if (keys.contains("dateTime")) {
      DateTime.parse(json.getString("dateTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
    } else if (keys.contains("warnTime")) {
      new DateTime(json.getLong("warnTime")*1000).toString("yyyy-MM-dd",Locale.CHINESE)
    } else {
      DateTime.parse(json.getString("businessTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
    }
    val createPartitionSql =s"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
    val createPartitionStmt = conn.prepareStatement(createPartitionSql)
    val sql =
      s"""
         |insert into $tableName
         |partition( partitionday = '$day' )
         |(${keys.map(x=>x.toLowerCase()).mkString(",")})
         |values (${keys.map(_ => "?").mkString(",")})
       """.stripMargin
    val stmt = conn.prepareStatement(sql)
    (1 to keys.length).foreach { index => stmt.setObject(index, json.get(keys(index - 1))) }
    try {
      createPartitionStmt.execute()
      stmt.execute()
      info(s"insert date to HiveTable $tableName SUCCESS")
    }catch {
      case e:Exception=>{
        println("Exception Messages==>"+e)
        println(s"hive table $tableName insert data failed==>"+json)
      }
    }finally{
      createPartitionStmt.close()
      stmt.close()
    }
  }
}