博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
通过自定义SparkSQL外部数据源实现SparkSQL读取HBase
阅读量:6841 次
发布时间:2019-06-26

本文共 10485 字,大约阅读时间需要 34 分钟。

hot3.png

包: sparksql.hbase

HBaseRelation.scala

package sparksql.hbaseimport java.io.Serializableimport org.apache.spark.sql._import org.apache.spark.sql.sources.TableScanimport org.apache.hadoop.hbase.client.{Result}import org.apache.spark.sql._import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport scala.collection.JavaConversions._import scala.collection.JavaConverters._import scala.collection.mutable.ArrayBufferimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.DataTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.LongTypeimport org.apache.spark.sql.types.IntegerTypeimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.sources.BaseRelationimport sparksql.hbase.hbase._  object Resolver extends  Serializable {   def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = {    val cfColArray = hbaseField.fieldName.split(":",-1)    val cfName = cfColArray(0)    val colName =  cfColArray(1)    var fieldRs: Any = null    //resolve row key otherwise resolve column    if(cfName=="" && colName=="key") {      fieldRs = resolveRowKey(result, hbaseField.fieldType)    } else {      fieldRs =  resolveColumn(result, cfName, colName,hbaseField.fieldType)    }    fieldRs  }   def resolveRowKey (result: Result, resultType: String): Any = {     val rowkey = resultType match {      case "string" =>        result.getRow.map(_.toChar).mkString      case "int" =>        result  .getRow.map(_.toChar).mkString.toInt      case "long" =>        result.getRow.map(_.toChar).mkString.toLong    }    rowkey  }   def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = {    val column = result.containsColumn(columnFamily.getBytes, columnName.getBytes) match{        case true =>{            resultType match {              case "string" =>                Bytes.toString(result.getValue(columnFamily.getBytes,columnName.getBytes))                //result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString              case "int" =>                Bytes.toInt(result.getValue(columnFamily.getBytes,columnName.getBytes))              case "long" =>                Bytes.toLong(result.getValue(columnFamily.getBytes,columnName.getBytes))              case "float" =>                Bytes.toFloat(result.getValue(columnFamily.getBytes,columnName.getBytes))              case "double" =>                Bytes.toDouble(result.getValue(columnFamily.getBytes,columnName.getBytes))             }        }        case _ => {            resultType match {              case "string" =>                ""              case "int" =>                0              case "long" =>                0             }        }    }    column  }} /**   val hbaseDDL = s"""      |CREATE TEMPORARY TABLE hbase_people      |USING com.shengli.spark.hbase      |OPTIONS (      |  sparksql_table_schema   '(row_key string, name string, age int, job string)',      |   hbase_table_name     'people',      | hbase_table_schema '(:key , profile:name , profile:age , career:job )'      |)""".stripMargin */case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{   val hbaseTableName =  hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))  val hbaseTableSchema =  hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))  val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))  val rowRange = hbaseProps.getOrElse("row_range", "->")  //get star row and end row  val range = rowRange.split("->",-1)  val startRowKey = range(0).trim  val endRowKey = range(1).trim   val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field  val registerTableFields = extractRegisterSchema(registerTableSchema)  val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)   val hbaseTableFields = feedTypes(tempFieldRelation)  val fieldsRelations =  tableSchemaFieldMapping(hbaseTableFields,registerTableFields)  val queryColumns =  getQueryTargetCloumns(hbaseTableFields)   def  feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) :  Array[HBaseSchemaField] = {         val hbaseFields = mapping.map{           case (k,v) =>               val field = k.copy(fieldType=v.fieldType)               field        }        hbaseFields.toArray  }   def isRowKey(field: HBaseSchemaField) : Boolean = {    val cfColArray = field.fieldName.split(":",-1)    val cfName = cfColArray(0)    val colName =  cfColArray(1)    if(cfName=="" && colName=="key") true else false  }   //eg: f1:col1  f1:col2  f1:col3  f2:col1  def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {    var str = ArrayBuffer[String]()    hbaseTableFields.foreach{ field=>         if(!isRowKey(field)) {           str +=  field.fieldName         }    }    str.mkString(" ")  }  lazy val schema = {    val fields = hbaseTableFields.map{ field=>        val name  = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName        val relatedType =  field.fieldType match  {          case "string" =>            SchemaType(StringType,nullable = false)          case "int" =>            SchemaType(IntegerType,nullable = false)          case "long" =>            SchemaType(LongType,nullable = false)        }        StructField(name,relatedType.dataType,relatedType.nullable)    }    StructType(fields)  }   def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField],  registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {       if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")       val rs = externalHBaseTable.zip(registerTable)       rs.toMap  }     /**     * spark sql schema will be register     *   registerTableSchema   '(rowkey string, value string, column_a string)'      */  def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {         val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)         val fieldsArray = fieldsStr.split(",").map(_.trim)         fieldsArray.map{ fildString =>           val splitedField = fildString.split("\\s+", -1)           RegisteredSchemaField(splitedField(0), splitedField(1))         }   }   //externalTableSchema '(:key , f1:col1 )'  def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {        val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)        val fieldsArray = fieldsStr.split(",").map(_.trim)        fieldsArray.map(fildString => HBaseSchemaField(fildString,""))  }     // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.  lazy val buildScan = {     val hbaseConf = HBaseConfiguration.create()    hbaseConf.set("hbase.zookeeper.quorum", "zookeeper-name")    hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)    hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns);    hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey);    hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey);     val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(      hbaseConf,      classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],      classOf[org.apache.hadoop.hbase.client.Result]    )      val rs = hbaseRdd.map(tuple => tuple._2).map(result => {      var values = new ArrayBuffer[Any]()      hbaseTableFields.foreach{field=>        values += Resolver.resolve(field,result)      }      Row.fromSeq(values.toSeq)    })    rs  }   private case class SchemaType(dataType: DataType, nullable: Boolean)}

DefaultSource.scala

package sparksql.hbaseimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.sources.RelationProvider  class DefaultSource extends RelationProvider {  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {    HBaseRelation(parameters)(sqlContext)  }}

package.scala

package sparksql.hbaseimport org.apache.spark.sql.SQLContextimport scala.collection.immutable.HashMap   package object hbase {   abstract class SchemaField extends Serializable    case class RegisteredSchemaField(fieldName: String, fieldType: String)  extends  SchemaField  with Serializable    case class HBaseSchemaField(fieldName: String, fieldType: String)  extends  SchemaField  with Serializable    case class Parameter(name: String)    protected  val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema")  protected  val HBASE_TABLE_NAME = Parameter("hbase_table_name")  protected  val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema")  protected  val ROW_RANGE = Parameter("row_range")  /**   * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table.   */  implicit class HBaseContext(sqlContext: SQLContext) {    def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = {      var params = new HashMap[String, String]      params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema)      params += ( HBASE_TABLE_NAME.name -> hbaseTableName)      params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema)      //get star row and end row      params += ( ROW_RANGE.name -> rowRange)      sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext));    }  } }

使用示例:

package testimport org.apache.spark.SparkConfimport org.apache.spark.sql.SQLContextimport org.apache.spark.SparkContextobject SparkSqlHbaseTest {  def main(args: Array[String]): Unit = {    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark sql hbase test")    val sc = new SparkContext(sparkConf)    val sqlContext = new SQLContext(sc)    var hbasetable = sqlContext.read.format("sparksql.hbase").options(Map(    "sparksql_table_schema" -> "(key string, title string, url string)",    "hbase_table_name" -> "crawled",    "hbase_table_schema" -> "(:key , data:title , basic:url)"    )).load()    hbasetable.printSchema()    hbasetable.registerTempTable("crawled")    var records = sqlContext.sql("SELECT * from crawled limit 10").collect  }}

转载于:https://my.oschina.net/waterbear/blog/531010

你可能感兴趣的文章
[翻译]MongoDb 架构(MongoDb Architecture)
查看>>
oracle统计数据库所有表的数据记录数SQL
查看>>
随机森林案例分析:德国银行信贷风险分析
查看>>
批量去除歌曲tag标签
查看>>
驰骋工作流引擎设计系列05 启动流程设计
查看>>
Java 启动线程并保持
查看>>
CentOS7使用firewalld打开关闭防火墙与端口
查看>>
开启mysql远程访问的权限
查看>>
st2045 漏洞反弹root shell
查看>>
Debian 系统初体验
查看>>
将Unreal4打包后的工程嵌入到Qt或者桌面中
查看>>
TP 框架没有考虑完善的功能点:1、表达式查询不支持INSTR形式的查询
查看>>
你不可不知的家庭装修禁忌
查看>>
关于i++和++i
查看>>
如何处理win10系统内置Linux系统闪退问题
查看>>
在Ubuntu上通过命令行安装Elisa KDE音乐播放器
查看>>
CentOS下命令行和桌面模式的切换方法
查看>>
linux下socket编程
查看>>
android中解压文件
查看>>
如何进行大数据分析及处理?
查看>>