/ website / docs / 03-Usage Docs / 03-spark-api-docs.md
03-spark-api-docs.md
  1  # Spark API Docs
  2  
  3  <!--
  4  SPDX-FileCopyrightText: 2023 LakeSoul Contributors
  5  
  6  SPDX-License-Identifier: Apache-2.0
  7  -->
  8  
  9  ## 1. Create and Write LakeSoulTable
 10  
 11  ### 1.1 Table Name
 12  
 13  The table name in LakeSoul can be a path, and the directory where the data is stored is the table name of LakeSoulTable. At the same time a table can have a table name to help remember, or to access in SQL, that is not a string in the form of a path.
 14  
 15  When calling the Dataframe.write.save method to write data to LakeSoulTable, if the table does not exist, a new table will be automatically created using the storage path, but there is no table name by default, and can only be accessed through the path. You can add `option("shortTableName" , "table_name")` option to set the table name.
 16  
 17  Through DataFrame.write.saveAsTable, a table will be created, which can be accessed by the table name. The default path is `spark.sql.warehouse.dir`/current_database/table_name, which can be accessed by the path or table name later. To customize the table path, you can add `option("path", "s3://bucket/...")` option.
 18  
 19  When creating a table through SQL, the table name can be a path or a table name, and the path must be an absolute path. If it is a table name, the rules of the path are consistent with the above Dataframe.write.saveAsTable, which can be set through the LOCATION clause in `CREATE TABLE` SQL. For how to create a primary key partition table in SQL, you can refer to [7. Use Spark SQL to operate LakeSoul table](#7-operate-lakesoultable-by-spark-sql)
 20  
 21  
 22  ### 1.2 Metadata Management
 23  LakeSoul manages metadata through external database, so it can process metadata efficiently, and the meta cluster can be easily scaled up in the cloud.
 24  
 25  ### 1.3 Partition
 26  LakeSoulTable can be partitioned in two ways, range and hash, and they can be used at the same time.
 27    - Range partition is a common time-based table partition. Data files of different partitions are stored in different partition paths.
 28    - To use a hash partition, you must specify both the hash primary key fields and the hash bucket num. The hash bucket num is used to hash the hash primary key fields.
 29    - If you specify both range partition and hash partition, each range partition will have the same hash key written to file with the same bucket id.
 30    - When partitioning is specified, data written to LakeSoulTable must contain partitioning fields.
 31  
 32  Depending on the specific scenario, you can choose to use a range partition, a hash partition, or both. When a hash partition is specified, the data in LakeSoulTable will be unique by the primary key, which is the hash partition field + range partition field (if any).
 33  
 34  When a hash partition is specified, LakeSoulTable supports upsert operations, where writing to data in APPEND mode is disabled, and the `lakeSoulTable.upsert()` method can be used instead.
 35  
 36  ### 1.4 Code Examples
 37  ```scala
 38  import org.apache.spark.sql._
 39  val spark = SparkSession.builder.master("local")
 40    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
 41    .getOrCreate()
 42  import spark.implicits._
 43  
 44  val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name")
 45  val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
 46  
 47  //create table
 48  //spark batch
 49  df.write
 50    .mode("append")
 51    .format("lakesoul")
 52    .option("rangePartitions","date")
 53    .option("hashPartitions","id")
 54    .option("hashBucketNum","2")
 55    .save(tablePath)
 56  //spark streaming
 57  import org.apache.spark.sql.streaming.Trigger
 58  val readStream = spark.readStream.parquet("inputPath")
 59  val writeStream = readStream.writeStream
 60    .outputMode("append")
 61    .trigger(Trigger.ProcessingTime("1 minutes"))
 62    .format("lakesoul")
 63    .option("rangePartitions","date")
 64    .option("hashPartitions","id")
 65    .option("hashBucketNum", "2")
 66    .option("checkpointLocation", "s3a://bucket-name/checkpoint/path")
 67    .start(tablePath)
 68  writeStream.awaitTermination()
 69  
 70  //for existing table, it no longer need to specify partition information when writing data
 71  //equivalent to INSERT OVERWRITE PARTITION, if you do not specify option replaceWhere, the entire table will be overwritten
 72  df.write
 73    .mode("overwrite")
 74    .format("lakesoul")
 75    .option("replaceWhere","date='2021-01-01'")
 76    .save(tablePath)
 77  
 78  ```
 79  
 80  ## 2. Read LakeSoulTable
 81  You can read data by Spark API or building LakeSoulTable, Spark SQL is also supported, see [7. Operate LakeSoulTable by Spark SQL](#7-operate-lakeSoultable-by-spark-sql)
 82  
 83  ### 2.1 Code Examples
 84  
 85  ```scala
 86  import com.dmetasoul.lakesoul.tables.LakeSoulTable
 87  import org.apache.spark.sql._
 88  val spark = SparkSession.builder.master("local")
 89    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
 90    .getOrCreate()
 91  val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
 92  
 93  //by spark api
 94  val df1 = spark.read.format("lakesoul").load(tablePath)
 95  
 96  //by LakeSoulTableRel
 97  val df2 = LakeSoulTable.forPath(tablePath).toDF
 98  
 99  ```
100  
101  ## 3. Upsert LakeSoulTable
102  
103  ### 3.1 Batch
104  Upsert is supported when hash partitioning has been specified.
105  
106  MergeOnRead is used by default, upsert data is written as delta files. LakeSoul provides efficient upsert and merge scan performance.
107  
108  Parameter `spark.dmetasoul.lakesoul.deltaFile.enabled` can be set to `false` to use CopyOnWrite mode, eventually merged data will be generated after upsert, but this mode is not recommended, because it has poor performance and low concurrent.
109  
110  #### 3.1.1 Code Examples
111  
112  ```scala
113  import com.dmetasoul.lakesoul.tables.LakeSoulTable
114  import org.apache.spark.sql._
115  val spark = SparkSession.builder.master("local")
116    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
117    .getOrCreate()
118  import spark.implicits._
119  
120  val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
121  
122  val lakeSoulTable = LakeSoulTable.forPath(tablePath)
123  val extraDF = Seq(("2021-01-01",3,"chicken")).toDF("date","id","name")
124  
125  lakeSoulTable.upsert(extraDF)
126  ```
127  
128  ### 3.2 Streaming Support
129  In streaming, when outputMode is complete, each write will overwrite all previous data.
130    
131  When outputMode is append or update, if hash partition is specified, each write is treated as an upsert, if data with the same primary key exists at read time, the latest value of the same key overrides the previous one. Update mode is available only if hash partition is specified.  
132  Duplicate data is allowed if no hash partitioning is used. 
133  
134  ## 4. Update LakeSoulTable
135  LakeSoul supports update operations, which are performed by specifying the condition and the field Expression that needs to be updated. There are several ways to perform update, see annotations in `LakeSoulTable`.
136  
137  ### 4.1 Code Examples
138  
139  ```scala
140  import com.dmetasoul.lakesoul.tables.LakeSoulTable
141  import org.apache.spark.sql._
142  val spark = SparkSession.builder.master("local")
143    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
144    .getOrCreate()
145  
146  val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
147  val lakeSoulTable = LakeSoulTable.forPath(tablePath)
148  import org.apache.spark.sql.functions._
149  
150  //update(condition, set)
151  lakeSoulTable.update(col("date") > "2021-01-01", Map("date" -> lit("2021-01-02")))
152  
153  ```
154  
155  ## 5. Delete Data
156  LakeSoul supports delete operation to delete data that meet the conditions. Conditions can be any field, and if no condition is specified, all data in table will be deleted.
157  
158  ### 5.1 Code Examples
159  
160  ```scala
161  import com.dmetasoul.lakesoul.tables.LakeSoulTable
162  import org.apache.spark.sql._
163  val spark = SparkSession.builder.master("local")
164    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
165    .getOrCreate()
166  
167  val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
168  val lakeSoulTable = LakeSoulTable.forPath(tablePath)
169  
170  //delete data that meet the condition
171  lakeSoulTable.delete("date='2021-01-01'")
172  //delete full table data
173  lakeSoulTable.delete()
174  ```
175  
176  ## 6. Compaction
177  Upsert will generates delta files, which can affect read efficiency when delta files num become too large, in this time, compaction can be performed to merge files.
178  
179  When compaction is performed to the full table, you can set conditions for compaction, only range partitions that meet the conditions will perform compaction.
180  
181  Conditions to trigger compaction:
182  1. The last modification time for a range partition is before `spark.dmetasoul.lakesoul.compaction.interval` (ms), default is 12 hours
183  2. Delta file num exceeds `spark.dmetasoul.lakesoul.deltaFile.max.num`, default is 5
184  
185  ### 6.1 Code Examples
186  
187  ```scala
188  import com.dmetasoul.lakesoul.tables.LakeSoulTable
189  import org.apache.spark.sql._
190  val spark = SparkSession.builder.master("local")
191    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
192    .getOrCreate()
193  
194  val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
195  val lakeSoulTable = LakeSoulTable.forPath(tablePath)
196  
197  //compaction on the specified partition
198  lakeSoulTable.compaction("date='2021-01-01'")
199  //compAction on all partitions of the table
200  lakeSoulTable.compaction()
201  //compaction on all partitions, but only partitions meet the conditions will be performed
202  lakeSoulTable.compaction(false)
203  //spark sql
204  spark.sql("call LakeSoulTable.compaction(condition=>map('date','2021-01-01'),tablePath=>'"+tablePath+"')")
205  spark.sql("call LakeSoulTable.compaction(tableName=>'lakesoul_table_name')")
206  
207  ```
208  
209  ### 6.2 Compaction And Load Partition to Hive
210  Since version 2.0, LakeSoul supports load partition into Hive after compaction.
211  
212  ```scala
213  import com.dmetasoul.lakesoul.tables.LakeSoulTable
214  val lakeSoulTable = LakeSoulTable.forName("lakesoul_test_table")
215  lakeSoulTable.compaction("date='2021-01-01'", "spark_catalog.default.hive_test_table")
216  spark.sql("call LakeSoulTable.compaction(tableName=>'lakesoul_table_name',hiveTableName=>'spark_catalog.default.hive_test_table',condition=>map('date','2021-01-01'))")
217  ```
218  
219  **Note** If `lakesoul` has been set as default catalog, Hive tables should be referenced with `spark_catalog` prefix.
220  
221  ## 7. Operate LakeSoulTable by Spark SQL 
222  
223  LakeSoul supports Spark SQL to read and write data. When using it, you need to set `spark.sql.catalog.lakesoul` to `org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog`. At the same time, you can also set LakeSoul as the default Catalog, that is, add the `spark.sql.defaultCatalog=lakesoul` configuration item.
224  What has to be aware of is:
225     - The `insert into` function cannot be performed on a hash partitioned table, please use the `MERGE INTO` SQL syntax;
226  
227  ### 7.1 Code Examples
228  
229  #### 7.1.1 DDL SQL
230  
231  ```sql
232  # To create a primary key table, you need to set the primary key name and the number of hash buckets through TBLPROPERTIES, if not set, it is a non-primary key table
233  # To create a primary key CDC table, you need to add the table attribute `'lakesoul_cdc_change_column'='change_kind'`, please refer to [LakeSoul CDC table](../03-Usage%20Docs/04-cdc-ingestion-table.mdx)
234  CREATE TABLE default.table_name (id string, date string, data string) USING lakesoul
235      PARTITIONED BY (date)
236      LOCATION 's3://bucket/table_path'
237      TBLPROPERTIES(
238        'hashPartitions'='id',
239        'hashBucketNum'='2')
240  ```
241  
242  It also supports adding or deleting columns using ALTER TABLE. This part has the same syntax as Spark SQL and does not support changing the column type.
243  
244  #### 7.1.2 DML SQL
245  
246  ```sql
247  # INSERT INTO
248  insert overwrite/into table default.table_name partition (date='2021-01-01') select id from tmpView
249  
250  # MERGE INTO
251  # For primary key tables, Upsert can be implemented through the `Merge Into` statement
252  # Currently does not support MATCHED/NOT MATCHED conditional statements in Merge Into
253  # The ON clause can only contain expressions with equal primary keys. Non-primary key column joins are not supported, and non-equal expressions are not supported
254  MERGE INTO default.`table_name` AS t USING source_table AS s
255      ON t.hash = s.hash
256      WHEN MATCHED THEN UPDATE SET *
257      WHEN NOT MATCHED THEN INSERT *
258  ```
259  
260  **Notice**:
261  * The database (namespace) name can be added before the table name, the default is the database name of the current `USE`, and the `default` if the `USE database` has not been executed
262  * The table path can be set using the LOCATION clause or the `path` table property, if no path is set, it defaults to `spark.sql.warehouse.dir`/database_name/table_name/
263  * You can use the table path to read and write a LakeSoul table. In SQL, the table name part needs to be written as lakesoul.default.`table_path`
264  
265  ## 8. Operator on Hash Primary Keys
266  When hash partition is specified, the data in each range partition is partitioned according to the hash primary key and the partitioned data is ordered. Therefore, there is no need to do shuffle and sort when some operators perform on hash primary key.
267  
268  LakeSoul currently supports optimization of join, intersect, and except, and more operators will be supported in the future.
269  
270  ### 8.1 Join on Hash Keys
271  Scenarios:
272    - Shuffle and sort are not required when data from different partitions of the same table is joined on the hash keys
273    - If two different tables have the same hash field type and number of fields, and the same hash bucket num, there is no need to shuffle and sort when they are joined  on the hash keys
274  
275  ### 8.2 Intersect/Except on Hash Keys
276  Scenarios:
277    - Intersect/Except on hash keys for different partitions of the same table does not require shuffle, sort, and distinct
278    - Intersect/Except on hash keys for different tables that have the same type and number of hash keys, and the same hash bucket num, there is no need to shuffle, sort, and distinct 
279  
280  In a range partition, the hash primary keys are unique, so the results of intersect or except are not repeated, so the subsequent operations do not need to deduplicate again. For example, you can directly `count` the number of data, without the need for `count distinc`.
281  
282  ### 8.3 Code Examples
283  ```scala
284  import org.apache.spark.sql._
285  val spark = SparkSession.builder.master("local")
286    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
287    .config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
288    .config("spark.sql.defaultCatalog", "lakesoul")
289    .getOrCreate()
290  import spark.implicits._
291  
292  
293  val df1 = Seq(("2021-01-01",1,1,"rice"),("2021-01-02",2,2,"bread")).toDF("date","id1","id2","name")
294  val df2 = Seq(("2021-01-01",1,1,2.7),("2021-01-02",2,2,1.3)).toDF("date","id3","id4","price")
295  
296  val tablePath1 = "s3a://bucket-name/table/path/is/also/table/name/1"
297  val tablePath2 = "s3a://bucket-name/table/path/is/also/table/name/2"
298  
299  df1.write
300    .mode("append")
301    .format("lakesoul")
302    .option("rangePartitions","date")
303    .option("hashPartitions","id1,id2")
304    .option("hashBucketNum","2")
305    .save(tablePath1)
306  df2.write
307    .mode("append")
308    .format("lakesoul")
309    .option("rangePartitions","date")
310    .option("hashPartitions","id3,id4")
311    .option("hashBucketNum","2")
312    .save(tablePath2)
313  
314  
315  //join on hash keys without shuffle and sort
316  //different range partitions for the same table
317  spark.sql(
318    s"""
319      |select t1.*,t2.* from
320      | (select * from lakesoul.`$tablePath1` where date='2021-01-01') t1
321      | join 
322      | (select * from lakesoul.`$tablePath1` where date='2021-01-02') t2
323      | on t1.id1=t2.id1 and t1.id2=t2.id2
324    """.stripMargin)
325      .show()
326  //different tables with the same hash setting
327  spark.sql(
328    s"""
329      |select t1.*,t2.* from
330      | (select * from lakesoul.`$tablePath1` where date='2021-01-01') t1
331      | join 
332      | (select * from lakesoul.`$tablePath2` where date='2021-01-01') t2
333      | on t1.id1=t2.id3 and t1.id2=t2.id4
334    """.stripMargin)
335    .show()
336  
337  //intersect/except on hash keys without shuffle,sort and distinct
338  //different range partitions for the same table
339  spark.sql(
340    s"""
341      |select count(1) from 
342      | (select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-01'
343      |  intersect
344      | select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-02') t
345    """.stripMargin)
346    .show()
347  //different tables with the same hash setting
348  spark.sql(
349    s"""
350      |select count(1) from 
351      | (select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-01'
352      |  intersect
353      | select id3,id4 from lakesoul.`$tablePath2` where date='2021-01-01') t
354    """.stripMargin)
355    .show()
356  
357  ```
358  
359  ## 9. Schema Evolution
360  LakeSoul supports Schema Evolution, new columns allowed to be added (partitioning fields cannot be modified). When a new column is added and the existing data is read, the new column will be NULL. You can fill the new columns by upsert operation.
361  
362  ### 9.1 Merge Schema
363  Specify `mergeSchema` to `true` or enable `autoMerge` to merge the schema when writing data. The new schema is the union of table schema and the current written data schema.
364  
365  ### 9.2 Code Examples
366  ```scala
367  df.write
368    .mode("append")
369    .format("lakesoul")
370    .option("rangePartitions","date")
371    .option("hashPartitions","id")
372    .option("hashBucketNum","2")
373    //first way
374    .option("mergeSchema","true")
375    .save(tablePath)
376    
377  val spark = SparkSession.builder.master("local")
378    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
379    //second way
380    .config("spark.dmetasoul.lakesoul.schema.autoMerge.enabled", "true")
381    .getOrCreate()
382  ```
383  
384  ## 10. Drop Partition
385  Drop a partition, also known as drop range partition, does not actually delete the data files. You can use the Cleanup operation to cleanup stale data.
386  
387  ### 10.1 Code Examples
388  
389  ```scala
390  import com.dmetasoul.lakesoul.tables.LakeSoulTable
391  import org.apache.spark.sql._
392  val spark = SparkSession.builder.master("local")
393    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
394    .getOrCreate()
395  
396  val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
397  val lakeSoulTable = LakeSoulTable.forPath(tablePath)
398  
399  //drop the specified range partition
400  lakeSoulTable.dropPartition("date='2021-01-01'")
401  
402  ```
403  
404  ## 11. Drop Table
405  Drop table will directly deletes all the metadata and files.
406  
407  ### 11.1 Code Examples
408  
409  ```scala
410  import com.dmetasoul.lakesoul.tables.LakeSoulTable
411  import org.apache.spark.sql._
412  val spark = SparkSession.builder.master("local")
413    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
414    .getOrCreate()
415  
416  val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
417  val lakeSoulTable = LakeSoulTable.forPath(tablePath)
418  
419  //drop table
420  lakeSoulTable.dropTable()
421  
422  ```