/ website / docs / 02-Tutorials / 05-upsert-and-merge-udf.md
05-upsert-and-merge-udf.md
  1  # Upsert Data and Merge UDF Tutorial
  2  
  3  <!--
  4  SPDX-FileCopyrightText: 2023 LakeSoul Contributors
  5  
  6  SPDX-License-Identifier: Apache-2.0
  7  -->
  8  
  9  LakeSoul can support the function of updating some fields of the data that has entered the lake, without having to overwrite the entire data table, so as to avoid this heavy and resource wasting operation.
 10  
 11  For example, the data information of a table is as follows. The ID is the primary key (i.e. hashPartitions). At present, it is necessary to check the data of phone according to the primary key field_ Number to modify the field.
 12  
 13  | id  | name | phone_number | address   | job   | company   |
 14  |-----|------|--------------|-----------|-------|-----------|
 15  | 1   | Jake | 13700001111  | address_1 | job_1 | company_2 |
 16  | 2   | Make | 13511110000  | address_2 | job_2 | company_2 |
 17  
 18  Upsert can be used to update and overwrite existing fields. Upsert operation needs to include the primary key (e.g. id) and other fields (e.g. address) to be modified. Reading the address of the whole table data again can display the modified field information.
 19  
 20  ```scala
 21  import org.apache.spark.sql._
 22  val spark = SparkSession.builder.master("local")
 23    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
 24    .getOrCreate()
 25  import spark.implicits._
 26  
 27  val df = Seq(("1", "Jake", "13700001111", "address_1", "job_1", "company_1"),("2", "Make", "13511110000", "address_2", "job_2", "company_2"))
 28    .toDF("id", "name", "phone_number", "address", "job", "company")
 29  val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
 30  
 31  df.write
 32    .mode("append")
 33    .format("lakesoul")
 34    .option("hashPartitions","id")
 35    .option("hashBucketNum","2")
 36    .save(tablePath)
 37  
 38  val lakeSoulTable = LakeSoulTable.forPath(tablePath)
 39  val extraDF = Seq(("1", "address_1_1")).toDF("id","address")
 40  lakeSoulTable.upsert(extraDF)
 41  lakeSoulTable.toDF.show()
 42  
 43  /**
 44   *  result:
 45   *  +---+----+------------+-----------+-----+---------+
 46   *  | id|name|phone_number|    address|  job|  company|
 47   *  +---+----+------------+-----------+-----+---------+
 48   *  |  1|Jake| 13700001111|address_1_1|job_1|company_1|
 49   *  |  2|Make| 13511110000|  address_2|job_2|company_2|
 50   *  +---+----+------------+-----------+-----+---------+
 51   */
 52  ```
 53  
 54  ## Customize Merge Logic
 55  The essence of the field update supported by LakeSoul is to follow the default merge rule of LakeSoul, that is, after data is upserted, the latest record is taken as the changed field data (see `org.apache.spark.sql.execution.datasources.v2.merge.request.batch.merge_operator.DefaultMergeOp`). On this basis, LakeSoul has several built-int merge operators, including adding and merging Int/Long fields (MergeOpInt/MergeOpLong), updating no empty fields (MergeNonNullOp), and merging strings with ",".
 56  
 57  The following is an example of updating no null fields (MergeNonNullOp), borrowing the above table data sample. When data is written, it is also updated and written in the upsert mode. When reading data, you need to register the merge logic and then read.
 58  
 59  ```scala
 60  import org.apache.spark.sql.execution.datasources.v2.merge.parquet.batch.merge_operator.MergeNonNullOp
 61  import org.apache.spark.sql.functions.expr
 62  import org.apache.spark.sql._
 63  val spark = SparkSession.builder.master("local")
 64    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
 65    .getOrCreate()
 66  import spark.implicits._
 67  
 68  val df = Seq(("1", "Jake", "13700001111", "address_1", "job_1", "company_1"),("2", "Make", "13511110000", "address_2", "job_2", "company_2"))
 69    .toDF("id", "name", "phone_number", "address", "job", "company")
 70  
 71  val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
 72  
 73  df.write
 74    .mode("append")
 75    .format("lakesoul")
 76    .option("hashPartitions","id")
 77    .option("hashBucketNum","2")
 78    .save(tablePath)
 79  
 80  val lakeSoulTable = LakeSoulTable.forPath(tablePath)
 81  val extraDF = Seq(("1", "null", "13100001111", "address_1_1", "job_1_1", "company_1_1"),("2", "null", "13111110000", "address_2_2", "job_2_2", "company_2_2"))
 82    .toDF("id", "name", "phone_number", "address", "job", "company")
 83  
 84  new MergeNonNullOp().register(spark, "NotNullOp")
 85  lakeSoulTable.toDF.show()
 86  lakeSoulTable.upsert(extraDF)
 87  lakeSoulTable.toDF.withColumn("name", expr("NotNullOp(name)")).show()
 88  
 89  /**
 90   *  result
 91   *  +---+----+------------+-----------+-------+-----------+
 92   *  | id|name|phone_number|    address|    job|    company|
 93   *  +---+----+------------+-----------+-------+-----------+
 94   *  |  1|Jake| 13100001111|address_1_1|job_1_1|company_1_1|
 95   *  |  2|Make| 13111110000|address_2_2|job_2_2|company_2_2|
 96   *  +---+----+------------+-----------+-------+-----------+
 97   */
 98  ```
 99  
100  You could also define your own merge logic via implementing the trait `org.apache.spark.sql.execution.datasources.v2.merge.parquet.batch.merge_operator.MergeOperator` to achieve efficient data update.