03-spark-api-docs.md
1 # Spark API Docs 2 3 <!-- 4 SPDX-FileCopyrightText: 2023 LakeSoul Contributors 5 6 SPDX-License-Identifier: Apache-2.0 7 --> 8 9 ## 1. Create and Write LakeSoulTable 10 11 ### 1.1 Table Name 12 13 The table name in LakeSoul can be a path, and the directory where the data is stored is the table name of LakeSoulTable. At the same time a table can have a table name to help remember, or to access in SQL, that is not a string in the form of a path. 14 15 When calling the Dataframe.write.save method to write data to LakeSoulTable, if the table does not exist, a new table will be automatically created using the storage path, but there is no table name by default, and can only be accessed through the path. You can add `option("shortTableName" , "table_name")` option to set the table name. 16 17 Through DataFrame.write.saveAsTable, a table will be created, which can be accessed by the table name. The default path is `spark.sql.warehouse.dir`/current_database/table_name, which can be accessed by the path or table name later. To customize the table path, you can add `option("path", "s3://bucket/...")` option. 18 19 When creating a table through SQL, the table name can be a path or a table name, and the path must be an absolute path. If it is a table name, the rules of the path are consistent with the above Dataframe.write.saveAsTable, which can be set through the LOCATION clause in `CREATE TABLE` SQL. For how to create a primary key partition table in SQL, you can refer to [7. Use Spark SQL to operate LakeSoul table](#7-operate-lakesoultable-by-spark-sql) 20 21 22 ### 1.2 Metadata Management 23 LakeSoul manages metadata through external database, so it can process metadata efficiently, and the meta cluster can be easily scaled up in the cloud. 24 25 ### 1.3 Partition 26 LakeSoulTable can be partitioned in two ways, range and hash, and they can be used at the same time. 27 - Range partition is a common time-based table partition. Data files of different partitions are stored in different partition paths. 28 - To use a hash partition, you must specify both the hash primary key fields and the hash bucket num. The hash bucket num is used to hash the hash primary key fields. 29 - If you specify both range partition and hash partition, each range partition will have the same hash key written to file with the same bucket id. 30 - When partitioning is specified, data written to LakeSoulTable must contain partitioning fields. 31 32 Depending on the specific scenario, you can choose to use a range partition, a hash partition, or both. When a hash partition is specified, the data in LakeSoulTable will be unique by the primary key, which is the hash partition field + range partition field (if any). 33 34 When a hash partition is specified, LakeSoulTable supports upsert operations, where writing to data in APPEND mode is disabled, and the `lakeSoulTable.upsert()` method can be used instead. 35 36 ### 1.4 Code Examples 37 ```scala 38 import org.apache.spark.sql._ 39 val spark = SparkSession.builder.master("local") 40 .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") 41 .getOrCreate() 42 import spark.implicits._ 43 44 val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name") 45 val tablePath = "s3a://bucket-name/table/path/is/also/table/name" 46 47 //create table 48 //spark batch 49 df.write 50 .mode("append") 51 .format("lakesoul") 52 .option("rangePartitions","date") 53 .option("hashPartitions","id") 54 .option("hashBucketNum","2") 55 .save(tablePath) 56 //spark streaming 57 import org.apache.spark.sql.streaming.Trigger 58 val readStream = spark.readStream.parquet("inputPath") 59 val writeStream = readStream.writeStream 60 .outputMode("append") 61 .trigger(Trigger.ProcessingTime("1 minutes")) 62 .format("lakesoul") 63 .option("rangePartitions","date") 64 .option("hashPartitions","id") 65 .option("hashBucketNum", "2") 66 .option("checkpointLocation", "s3a://bucket-name/checkpoint/path") 67 .start(tablePath) 68 writeStream.awaitTermination() 69 70 //for existing table, it no longer need to specify partition information when writing data 71 //equivalent to INSERT OVERWRITE PARTITION, if you do not specify option replaceWhere, the entire table will be overwritten 72 df.write 73 .mode("overwrite") 74 .format("lakesoul") 75 .option("replaceWhere","date='2021-01-01'") 76 .save(tablePath) 77 78 ``` 79 80 ## 2. Read LakeSoulTable 81 You can read data by Spark API or building LakeSoulTable, Spark SQL is also supported, see [7. Operate LakeSoulTable by Spark SQL](#7-operate-lakeSoultable-by-spark-sql) 82 83 ### 2.1 Code Examples 84 85 ```scala 86 import com.dmetasoul.lakesoul.tables.LakeSoulTable 87 import org.apache.spark.sql._ 88 val spark = SparkSession.builder.master("local") 89 .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") 90 .getOrCreate() 91 val tablePath = "s3a://bucket-name/table/path/is/also/table/name" 92 93 //by spark api 94 val df1 = spark.read.format("lakesoul").load(tablePath) 95 96 //by LakeSoulTableRel 97 val df2 = LakeSoulTable.forPath(tablePath).toDF 98 99 ``` 100 101 ## 3. Upsert LakeSoulTable 102 103 ### 3.1 Batch 104 Upsert is supported when hash partitioning has been specified. 105 106 MergeOnRead is used by default, upsert data is written as delta files. LakeSoul provides efficient upsert and merge scan performance. 107 108 Parameter `spark.dmetasoul.lakesoul.deltaFile.enabled` can be set to `false` to use CopyOnWrite mode, eventually merged data will be generated after upsert, but this mode is not recommended, because it has poor performance and low concurrent. 109 110 #### 3.1.1 Code Examples 111 112 ```scala 113 import com.dmetasoul.lakesoul.tables.LakeSoulTable 114 import org.apache.spark.sql._ 115 val spark = SparkSession.builder.master("local") 116 .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") 117 .getOrCreate() 118 import spark.implicits._ 119 120 val tablePath = "s3a://bucket-name/table/path/is/also/table/name" 121 122 val lakeSoulTable = LakeSoulTable.forPath(tablePath) 123 val extraDF = Seq(("2021-01-01",3,"chicken")).toDF("date","id","name") 124 125 lakeSoulTable.upsert(extraDF) 126 ``` 127 128 ### 3.2 Streaming Support 129 In streaming, when outputMode is complete, each write will overwrite all previous data. 130 131 When outputMode is append or update, if hash partition is specified, each write is treated as an upsert, if data with the same primary key exists at read time, the latest value of the same key overrides the previous one. Update mode is available only if hash partition is specified. 132 Duplicate data is allowed if no hash partitioning is used. 133 134 ## 4. Update LakeSoulTable 135 LakeSoul supports update operations, which are performed by specifying the condition and the field Expression that needs to be updated. There are several ways to perform update, see annotations in `LakeSoulTable`. 136 137 ### 4.1 Code Examples 138 139 ```scala 140 import com.dmetasoul.lakesoul.tables.LakeSoulTable 141 import org.apache.spark.sql._ 142 val spark = SparkSession.builder.master("local") 143 .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") 144 .getOrCreate() 145 146 val tablePath = "s3a://bucket-name/table/path/is/also/table/name" 147 val lakeSoulTable = LakeSoulTable.forPath(tablePath) 148 import org.apache.spark.sql.functions._ 149 150 //update(condition, set) 151 lakeSoulTable.update(col("date") > "2021-01-01", Map("date" -> lit("2021-01-02"))) 152 153 ``` 154 155 ## 5. Delete Data 156 LakeSoul supports delete operation to delete data that meet the conditions. Conditions can be any field, and if no condition is specified, all data in table will be deleted. 157 158 ### 5.1 Code Examples 159 160 ```scala 161 import com.dmetasoul.lakesoul.tables.LakeSoulTable 162 import org.apache.spark.sql._ 163 val spark = SparkSession.builder.master("local") 164 .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") 165 .getOrCreate() 166 167 val tablePath = "s3a://bucket-name/table/path/is/also/table/name" 168 val lakeSoulTable = LakeSoulTable.forPath(tablePath) 169 170 //delete data that meet the condition 171 lakeSoulTable.delete("date='2021-01-01'") 172 //delete full table data 173 lakeSoulTable.delete() 174 ``` 175 176 ## 6. Compaction 177 Upsert will generates delta files, which can affect read efficiency when delta files num become too large, in this time, compaction can be performed to merge files. 178 179 When compaction is performed to the full table, you can set conditions for compaction, only range partitions that meet the conditions will perform compaction. 180 181 Conditions to trigger compaction: 182 1. The last modification time for a range partition is before `spark.dmetasoul.lakesoul.compaction.interval` (ms), default is 12 hours 183 2. Delta file num exceeds `spark.dmetasoul.lakesoul.deltaFile.max.num`, default is 5 184 185 ### 6.1 Code Examples 186 187 ```scala 188 import com.dmetasoul.lakesoul.tables.LakeSoulTable 189 import org.apache.spark.sql._ 190 val spark = SparkSession.builder.master("local") 191 .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") 192 .getOrCreate() 193 194 val tablePath = "s3a://bucket-name/table/path/is/also/table/name" 195 val lakeSoulTable = LakeSoulTable.forPath(tablePath) 196 197 //compaction on the specified partition 198 lakeSoulTable.compaction("date='2021-01-01'") 199 //compAction on all partitions of the table 200 lakeSoulTable.compaction() 201 //compaction on all partitions, but only partitions meet the conditions will be performed 202 lakeSoulTable.compaction(false) 203 //spark sql 204 spark.sql("call LakeSoulTable.compaction(condition=>map('date','2021-01-01'),tablePath=>'"+tablePath+"')") 205 spark.sql("call LakeSoulTable.compaction(tableName=>'lakesoul_table_name')") 206 207 ``` 208 209 ### 6.2 Compaction And Load Partition to Hive 210 Since version 2.0, LakeSoul supports load partition into Hive after compaction. 211 212 ```scala 213 import com.dmetasoul.lakesoul.tables.LakeSoulTable 214 val lakeSoulTable = LakeSoulTable.forName("lakesoul_test_table") 215 lakeSoulTable.compaction("date='2021-01-01'", "spark_catalog.default.hive_test_table") 216 spark.sql("call LakeSoulTable.compaction(tableName=>'lakesoul_table_name',hiveTableName=>'spark_catalog.default.hive_test_table',condition=>map('date','2021-01-01'))") 217 ``` 218 219 **Note** If `lakesoul` has been set as default catalog, Hive tables should be referenced with `spark_catalog` prefix. 220 221 ## 7. Operate LakeSoulTable by Spark SQL 222 223 LakeSoul supports Spark SQL to read and write data. When using it, you need to set `spark.sql.catalog.lakesoul` to `org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog`. At the same time, you can also set LakeSoul as the default Catalog, that is, add the `spark.sql.defaultCatalog=lakesoul` configuration item. 224 What has to be aware of is: 225 - The `insert into` function cannot be performed on a hash partitioned table, please use the `MERGE INTO` SQL syntax; 226 227 ### 7.1 Code Examples 228 229 #### 7.1.1 DDL SQL 230 231 ```sql 232 # To create a primary key table, you need to set the primary key name and the number of hash buckets through TBLPROPERTIES, if not set, it is a non-primary key table 233 # To create a primary key CDC table, you need to add the table attribute `'lakesoul_cdc_change_column'='change_kind'`, please refer to [LakeSoul CDC table](../03-Usage%20Docs/04-cdc-ingestion-table.mdx) 234 CREATE TABLE default.table_name (id string, date string, data string) USING lakesoul 235 PARTITIONED BY (date) 236 LOCATION 's3://bucket/table_path' 237 TBLPROPERTIES( 238 'hashPartitions'='id', 239 'hashBucketNum'='2') 240 ``` 241 242 It also supports adding or deleting columns using ALTER TABLE. This part has the same syntax as Spark SQL and does not support changing the column type. 243 244 #### 7.1.2 DML SQL 245 246 ```sql 247 # INSERT INTO 248 insert overwrite/into table default.table_name partition (date='2021-01-01') select id from tmpView 249 250 # MERGE INTO 251 # For primary key tables, Upsert can be implemented through the `Merge Into` statement 252 # Currently does not support MATCHED/NOT MATCHED conditional statements in Merge Into 253 # The ON clause can only contain expressions with equal primary keys. Non-primary key column joins are not supported, and non-equal expressions are not supported 254 MERGE INTO default.`table_name` AS t USING source_table AS s 255 ON t.hash = s.hash 256 WHEN MATCHED THEN UPDATE SET * 257 WHEN NOT MATCHED THEN INSERT * 258 ``` 259 260 **Notice**: 261 * The database (namespace) name can be added before the table name, the default is the database name of the current `USE`, and the `default` if the `USE database` has not been executed 262 * The table path can be set using the LOCATION clause or the `path` table property, if no path is set, it defaults to `spark.sql.warehouse.dir`/database_name/table_name/ 263 * You can use the table path to read and write a LakeSoul table. In SQL, the table name part needs to be written as lakesoul.default.`table_path` 264 265 ## 8. Operator on Hash Primary Keys 266 When hash partition is specified, the data in each range partition is partitioned according to the hash primary key and the partitioned data is ordered. Therefore, there is no need to do shuffle and sort when some operators perform on hash primary key. 267 268 LakeSoul currently supports optimization of join, intersect, and except, and more operators will be supported in the future. 269 270 ### 8.1 Join on Hash Keys 271 Scenarios: 272 - Shuffle and sort are not required when data from different partitions of the same table is joined on the hash keys 273 - If two different tables have the same hash field type and number of fields, and the same hash bucket num, there is no need to shuffle and sort when they are joined on the hash keys 274 275 ### 8.2 Intersect/Except on Hash Keys 276 Scenarios: 277 - Intersect/Except on hash keys for different partitions of the same table does not require shuffle, sort, and distinct 278 - Intersect/Except on hash keys for different tables that have the same type and number of hash keys, and the same hash bucket num, there is no need to shuffle, sort, and distinct 279 280 In a range partition, the hash primary keys are unique, so the results of intersect or except are not repeated, so the subsequent operations do not need to deduplicate again. For example, you can directly `count` the number of data, without the need for `count distinc`. 281 282 ### 8.3 Code Examples 283 ```scala 284 import org.apache.spark.sql._ 285 val spark = SparkSession.builder.master("local") 286 .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") 287 .config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog") 288 .config("spark.sql.defaultCatalog", "lakesoul") 289 .getOrCreate() 290 import spark.implicits._ 291 292 293 val df1 = Seq(("2021-01-01",1,1,"rice"),("2021-01-02",2,2,"bread")).toDF("date","id1","id2","name") 294 val df2 = Seq(("2021-01-01",1,1,2.7),("2021-01-02",2,2,1.3)).toDF("date","id3","id4","price") 295 296 val tablePath1 = "s3a://bucket-name/table/path/is/also/table/name/1" 297 val tablePath2 = "s3a://bucket-name/table/path/is/also/table/name/2" 298 299 df1.write 300 .mode("append") 301 .format("lakesoul") 302 .option("rangePartitions","date") 303 .option("hashPartitions","id1,id2") 304 .option("hashBucketNum","2") 305 .save(tablePath1) 306 df2.write 307 .mode("append") 308 .format("lakesoul") 309 .option("rangePartitions","date") 310 .option("hashPartitions","id3,id4") 311 .option("hashBucketNum","2") 312 .save(tablePath2) 313 314 315 //join on hash keys without shuffle and sort 316 //different range partitions for the same table 317 spark.sql( 318 s""" 319 |select t1.*,t2.* from 320 | (select * from lakesoul.`$tablePath1` where date='2021-01-01') t1 321 | join 322 | (select * from lakesoul.`$tablePath1` where date='2021-01-02') t2 323 | on t1.id1=t2.id1 and t1.id2=t2.id2 324 """.stripMargin) 325 .show() 326 //different tables with the same hash setting 327 spark.sql( 328 s""" 329 |select t1.*,t2.* from 330 | (select * from lakesoul.`$tablePath1` where date='2021-01-01') t1 331 | join 332 | (select * from lakesoul.`$tablePath2` where date='2021-01-01') t2 333 | on t1.id1=t2.id3 and t1.id2=t2.id4 334 """.stripMargin) 335 .show() 336 337 //intersect/except on hash keys without shuffle,sort and distinct 338 //different range partitions for the same table 339 spark.sql( 340 s""" 341 |select count(1) from 342 | (select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-01' 343 | intersect 344 | select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-02') t 345 """.stripMargin) 346 .show() 347 //different tables with the same hash setting 348 spark.sql( 349 s""" 350 |select count(1) from 351 | (select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-01' 352 | intersect 353 | select id3,id4 from lakesoul.`$tablePath2` where date='2021-01-01') t 354 """.stripMargin) 355 .show() 356 357 ``` 358 359 ## 9. Schema Evolution 360 LakeSoul supports Schema Evolution, new columns allowed to be added (partitioning fields cannot be modified). When a new column is added and the existing data is read, the new column will be NULL. You can fill the new columns by upsert operation. 361 362 ### 9.1 Merge Schema 363 Specify `mergeSchema` to `true` or enable `autoMerge` to merge the schema when writing data. The new schema is the union of table schema and the current written data schema. 364 365 ### 9.2 Code Examples 366 ```scala 367 df.write 368 .mode("append") 369 .format("lakesoul") 370 .option("rangePartitions","date") 371 .option("hashPartitions","id") 372 .option("hashBucketNum","2") 373 //first way 374 .option("mergeSchema","true") 375 .save(tablePath) 376 377 val spark = SparkSession.builder.master("local") 378 .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") 379 //second way 380 .config("spark.dmetasoul.lakesoul.schema.autoMerge.enabled", "true") 381 .getOrCreate() 382 ``` 383 384 ## 10. Drop Partition 385 Drop a partition, also known as drop range partition, does not actually delete the data files. You can use the Cleanup operation to cleanup stale data. 386 387 ### 10.1 Code Examples 388 389 ```scala 390 import com.dmetasoul.lakesoul.tables.LakeSoulTable 391 import org.apache.spark.sql._ 392 val spark = SparkSession.builder.master("local") 393 .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") 394 .getOrCreate() 395 396 val tablePath = "s3a://bucket-name/table/path/is/also/table/name" 397 val lakeSoulTable = LakeSoulTable.forPath(tablePath) 398 399 //drop the specified range partition 400 lakeSoulTable.dropPartition("date='2021-01-01'") 401 402 ``` 403 404 ## 11. Drop Table 405 Drop table will directly deletes all the metadata and files. 406 407 ### 11.1 Code Examples 408 409 ```scala 410 import com.dmetasoul.lakesoul.tables.LakeSoulTable 411 import org.apache.spark.sql._ 412 val spark = SparkSession.builder.master("local") 413 .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") 414 .getOrCreate() 415 416 val tablePath = "s3a://bucket-name/table/path/is/also/table/name" 417 val lakeSoulTable = LakeSoulTable.forPath(tablePath) 418 419 //drop table 420 lakeSoulTable.dropTable() 421 422 ```