Skip to content

added HBase timestamp support#297

Open
sbarnoud wants to merge 2 commits intohortonworks-spark:masterfrom
sbarnoud:added-timestamp-support
Open

added HBase timestamp support#297
sbarnoud wants to merge 2 commits intohortonworks-spark:masterfrom
sbarnoud:added-timestamp-support

Conversation

@sbarnoud
Copy link
Copy Markdown

#210

What changes were proposed in this pull request?

Added support of HBase timestamp.

How was this patch tested?

New TU. Added control in TU that "reserved" column families (rowkey and timestamp) are not created.

val tsSeq = {
if (relation.catalog.ts.isPresent) {
val f = relation.catalog.getTimestamp.head
Seq((f, result.rawCells()(0).getTimestamp)).toMap
Copy link
Copy Markdown

@regata regata Apr 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbarnoud when there are multiple cells in a single column, each cell will have different timestamp. However, this code reads the timestamp only from the latest cell. Is this intended behavior?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus tsSeq is not used inside buildRows

}
}

val valueSeq: Seq[Map[Long, (Field, Any)]] = fields.filter(c => HBaseTableCatalog.isNotReserved(c.cf)).map { x =>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about this?

    val tsField = relation.catalog.getTimestamp.head // ADDED
    val valueSeq: Seq[Map[Long, (Field, Any)]] = fields.filter(c => HBaseTableCatalog.isNotReserved(c.cf)).map { x =>
      import scala.collection.JavaConverters.asScalaBufferConverter
      val dataType = SHCDataTypeFactory.create(x)
      val kvs = result.getColumnCells(
        relation.catalog.shcTableCoder.toBytes(x.cf),
        relation.catalog.shcTableCoder.toBytes(x.col)).asScala

      kvs.map(kv => {
        val v = CellUtil.cloneValue(kv)
        (kv.getTimestamp, x -> dataType.fromBytes(v))
      }).toMap.withDefaultValue(x -> null)
    }

    val ts = valueSeq.foldLeft(Set.empty[Long])((acc, map) => acc ++ map.keySet)
    //we are loosing duplicate here, because we didn't support passing version (timestamp) to the row
    ts.map(version => {
      val tsSeq = Seq((tsField, version)).toMap // ADDED
      keySeq ++ tsSeq ++ valueSeq.map(_.apply(version)).toMap // MODIFIED
    }).map { unioned =>
      // Return the row ordered by the requested order
      Row.fromSeq(fields.map(unioned.get(_).getOrElse(null)))
    }

The only extra change that is required is to specify timestamp column when mergeToLatest is false

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad.In my use case, i have a single version, and i forget this point.
Yes, we should have the ts corresponding to the version!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants