Why this blog?
This blog serves as a practical guide for developers and data engineers looking to master DataFrame updates in Snowpark. Learn essential techniques to streamline data operations and ensure data integrity within your Snowflake environment.
Introduction
Data manipulations and transformation form one of the most critical operations in updating a DataFrame within Snowflake. Snowpark has various methods that make the same process easy, among which is Table.update()
.
The primary usage is to modify rows within a table directly. It returns a tuple named UpdateResult, which contains the count of rows that were changed and the count of multi-joined rows that were modified. This is what allows users to update DataFrames relatively with Table.update()
so that it keeps the dataset within Snowflake’s robust environment of efficient, strong data management.
Syntax
Table.update(<assignments>, <condition>, [<source>])
Parameters
<assignments>
A dictionary which contains key-value pairs representing columns of a DataFrame and the corresponding values with which they should be updated. The values can either be a literal value or a column object.<condition>
Represents the specific condition based on which a column should be updated. If no condition is specified, all the rows of the DataFrame will be updated.<source>
Represent another DataFrame on which the data of the current DataFrame will be updated. The join condition between both the DataFrames should be specified in <condition>.
Snowpark DataFrame Update Steps
Follow these steps to update a DataFrame in Snowpark using the Table.update()
method:
- Create a DataFrame with the desired data using
Session.createDataFrame()
. The DataFrame could be built based on an existing table, data read from a CSV file, or content created within the code. - Use the DataFrameWriter class to create a temporary table with the DataFrame’s contents.
- Create a DataFrame to read the contents of the temporary table using the
Session.table()
method. - Update the contents of the DataFrame created using the temporary table with the
Table.update()
method. - Display the contents of the DataFrame using the
DataFrame.show()
method to verify that the appropriate records have been updated.
Temporary tables only exist within the session in which they were created and are not visible to other users or sessions. Once the session ends, the table is completely purged from the system. Therefore, temporary tables are well-suited for updating DataFrames.
Demonstration of Updating all rows of a DataFrame
STEP-1: Create DataFrame
The following code creates a DataFrame df_emp
which holds the EMPLOYEES data as shown below:
# Create a DataFrame with employee data
employee_data = [
[1, 'MIKE', 24000, 10],
[2, 'SARA', 17000, 10],
[3, 'JANE', 9000, 20],
[4, 'ROB', 20000, 20]
]
employee_schema = ["EMP_ID", "EMP_NAME", "SALARY", "DEPT_ID"]
df_emp = session.createDataFrame(employee_data, schema=employee_schema)
df_emp.show()
Output:
------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |
------------------------------------------------
|1 |MIKE |24000 |10 |
|2 |SARA |17000 |10 |
|3 |JANE |9000 |20 |
|4 |ROB |20000 |20 |
------------------------------------------------
STEP-2: Create Temporary Table
The following code creates a temporary table named tmp_emp
in the Snowflake database using the contents of df_emp
DataFrame.
# Create a temp table
df_emp.write.mode(“overwrite”).save_as_table(“tmp_emp”, table_type=”temp”)
STEP-3: Read Temporary Table
The following code creates a new DataFrame df_tmp_emp
which reads the contents of the temporary table tmp_emp
.
# Create a DataFrame to read contents of temp table
df_tmp_emp = session.table("tmp_emp")
df_tmp_emp.show()
Output:
------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |
------------------------------------------------
|1 |MIKE |24000 |10 |
|2 |SARA |17000 |10 |
|3 |JANE |9000 |20 |
|4 |ROB |20000 |20 |
------------------------------------------------
STEP-4: Update DataFrame
The following code updates all the records of the DataFrame df_tmp_emp
by multiplying the DEPT_ID
values by 10 and doubling the SALARY
amounts.
# Update DEPT_ID and SALARY fields of all records
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import cast
df_tmp_emp.update({"DEPT_ID": cast("DEPT_ID", IntegerType()) * 10, "SALARY": cast("SALARY", IntegerType()) * 2})
# UpdateResult(rows_updated=4, multi_joined_rows_updated=0)
Note that we have used the cast function to convert the DEPT_ID and SALARY fields to Integer type before updating them.
STEP-5: Display Updated DataFrame
The following code displays the contents of the updated DataFrame.
# Display updated DataFrame
df_tmp_emp.show()
Output:
------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |
------------------------------------------------
|1 |MIKE |48000 |100 |
|2 |SARA |34000 |100 |
|3 |JANE |18000 |200 |
|4 |ROB |40000 |200 |
------------------------------------------------
Updating a DataFrame Based on a Condition
The following code updates the salary of all employees belonging to department 100.
# Update the SALARY field of employees where DEPT_ID is 100
df_tmp_emp.update({"SALARY": cast("SALARY", IntegerType()) + 100}, df_tmp_emp["DEPT_ID"] == 100)
# UpdateResult(rows_updated=2, multi_joined_rows_updated=0)
df_tmp_emp.show()
Output:
------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |
------------------------------------------------
|1 |MIKE |48100 |100 |
|2 |SARA |34100 |100 |
|3 |JANE |18000 |200 |
|4 |ROB |40000 |200 |
------------------------------------------------
Updating a DataFrame Based on Data in Another DataFrame
A DataFrame can also be updated based on the data in another DataFrame using the Table.update()
method.
The following code updates employees SALARY
in the df_tmp_emp
DataFrame where EMP_ID
is equal to EMP_ID
in another DataFrame df_salary
.
# Update DataFrame based on data in another DataFrame
df_salary = session.createDataFrame([[1, 50000], [2, 35000]], ["EMP_ID", "SALARY"])
df_salary.show()
Output:
-----------------------
|"EMP_ID" |"SALARY" |
-----------------------
|1 |50000 |
|2 |35000 |
-----------------------
df_tmp_emp.update({"SALARY": df_salary["SALARY"]}, df_tmp_emp["EMP_ID"] == df_salary["EMP_ID"], df_salary)
# UpdateResult(rows_updated=2, multi_joined_rows_updated=0)
df_tmp_emp.show()
Output:
------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |
------------------------------------------------
|1 |MIKE |50000 |100 |
|2 |SARA |35000 |100 |
|3 |JANE |18000 |200 |
|4 |ROB |40000 |200 |
------------------------------------------------
Looking to boost your business with better data management?
Talk to our experts to see how Snowflake can simplify your data operations and drive your business growth.
Updating a DataFrame using Session.sql() Method
The Session.sql()
method in Snowpark can run an SQL statement. It returns a new DataFrame representing the results of an SQL query.
Follow these steps to update the data of a DataFrame in Snowpark using the Session.sql()
method:
- Create a DataFrame: Create a DataFrame with the desired data using
Session.createDataFrame()
. This DataFrame can be built based on an existing table, data read from a CSV file, or content created within the code - Create a Temporary Table: Use the
DataFrameWriter
class to create a temporary table with the contents of the DataFrame - Update DataFrame Using Session.sql(): Utilize the
Session.sql()
method to update the contents of the temporary table - Create a DataFrame to Read Contents of Updated Temp Table: Use the
session.table()
method to create a DataFrame that reads the contents of the updated temporary table. - Display Updated DataFrame: Use the
DataFrame.show()
method to display the contents of the DataFrame and verify that the appropriate records have been updated
Here’s the code to illustrate these steps:
# Create a DataFrame
employee_data = [
[1, 'MIKE', 24000, 10],
[2, 'SARA', 17000, 10],
[3, 'JANE', 9000, 20],
[4, 'ROB', 20000, 20]
]
employee_schema = ["EMP_ID", "EMP_NAME", "SALARY", "DEPT_ID"]
df_emp = session.createDataFrame(employee_data, schema=employee_schema)
# Create a temporary table
df_emp.write.mode("overwrite").save_as_table("tmp_emp", table_type="temp")
# Update DataFrame using session.sql()
session.sql("UPDATE tmp_emp SET SALARY=70000 WHERE EMP_ID=3").collect()
# [Row(number of rows updated=1, number of multi-joined rows updated=0)]
# Create DataFrame to read contents of updated temp table
df_tmp_emp = session.table("tmp_emp")
# Display updated DataFrame
df_tmp_emp.show()
Output:
------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |
------------------------------------------------
|1 |MIKE |24000 |10 |
|2 |SARA |17000 |10 |
|3 |JANE |70000 |20 |
|4 |ROB |20000 |20 |
------------------------------------------------
In this example, the Session.sql()
method updates the SALARY
of the employee with EMP_ID
3 to 70000. The updated DataFrame shows the change accurately.
Streamlining Data Management in Snowpark
Updating DataFrames in Snowpark is a powerful feature that offers flexibility and efficiency for data manipulation tasks. By using the Table.update()
method, you can perform complex updates with conditions and even synchronize data between different DataFrames. The ability to leverage temporary tables ensures that your updates are session-specific and do not interfere with other users’ data. Additionally, the Session.sql()
method provides an alternative approach for those who prefer SQL syntax for updates. These methods combined make Snowpark a versatile tool for managing and updating large datasets seamlessly within a Snowflake environment. By mastering these techniques, you can enhance your data processing workflows and ensure data integrity across your projects.
FAQ’s
How can I ensure data consistency during updates?
Optimistic locking with a version column prevents overwriting uncommitted changes. It ensures data consistency even if updates are interrupted.
Does Table.update() offer upsert (insert or update)?
No, Table.update() doesn’t support upsert natively. However, Snowpark’s merge function with WhenMatchedClause.update allows inserting new rows if the update condition isn’t met, mimicking upsert behavior.
Can Table.update() be used for nested updates?
While Table.update() doesn’t directly support nested updates, you can achieve a similar effect. Update one DataFrame, then conditionally filter and update another related DataFrame within the same Snowpark session.
Are there techniques for optimizing updates on very large DataFrames?
Absolutely! Techniques like partition pruning (focusing on relevant data partitions), micro-batching (breaking down updates into smaller batches), and asynchronous updates with Snowpipe can significantly improve update performance for massive DataFrames.
Can Table.update() be used with external CDC tools?
Yes! Leverage Snowpipe to ingest CDC data (changes captured from external sources) and create DataFrames in Snowpark. This allows you to process and update Snowflake tables with the latest external data.