SQL to PySpark

A quick guide for moving from SQL to PySpark.

Michael Berk
Towards Data Science

--

If you know SQL but need to work in PySpark, this post is for you!

Photo by Miki Fath on Unsplash

Spark is rapidly becoming one of the most widely adopted frameworks for big data processing. But why work in native PySpark instead of SQL?

Well, you don’t have to. PySpark allows you to create a tempView that does not sacrifice runtime performance. On the backend, spark runs the same transformations regardless of the language, in the exact same way. So, if you want to stick to SQL your code won’t execute any differently.

However, when working in the DataFrame API you will get compile-time errors whereas with raw SQL you’ll get errors at runtime. If you’re working with large data, the same error can be surfaced much earlier when working in native PySpark.

In this post we will leverage Spark: the Definitive Guide and sequentially handle each clause in a basic SQL query and explain how to duplicate this logic in PySpark.

Without further ado, let’s dive in…

SELECT

Any good SQL query starts with a SELECT statement — it determines what columns will be pulled and whether they should be transformed or renamed.

SQL

SELECT 
column_1,
CASE WHEN column_2 IS NULL THEN 0 ELSE 1 END AS is_not_null,
SUM(column_3) OVER(PARTITION BY column_1)

PySpark

import pyspark.sql.functions as F

df = (
df.withColumn("is_not_null",
F.when(F.col("column_2").isNull(), 0).otherwise(1)
).withColumn("sum_column_3_over_partition", F.sum("column_3").over(
Window.partitionBy("column_1")
))
)

As shown above, SQL and PySpark have very similar structure. The df.select() method takes a sequence of strings passed as positional arguments. Each of the SQL keywords have an equivalent in PySpark using: dot notation e.g. df.method(), pyspark.sql, or pyspark.sql.functions.

Pretty much any SQL select structure is easy to duplicate with some googling for the SQL keywords.

Tip: use df.selectExpr() to run SQL commands with SQL strings.

FROM

Now our SELECT statements are worthless without a good FROM clause.

SQL

FROM df

PySpark

df

Pretty complex, right?

As shown above, the FROM table is defined by the DataFrame you reference prior to a method.

If you’re accustomed to using CTE’s in your SQL code, you can duplicate CTE logic by assigning a set of transformations to a variable.

import pypsark.sql.functions as F

df = spark.read.table(my_table)
cte_1 = df.withColumn(
"is_not_null",
F.when(F.col("column_2").isNull(), 0).otherwise(1)
)
cte_2 = cte_1.withColumn(
"sum_column_3_over_partition",
F.sum("column_3").over(
Window.partitionBy("column_1")
)
)

WHERE

The WHERE clause is an underrated clause that can dramatically improve query time. In PySpark, there are two identical methods that allow you to filter data: df.where() and df.filter().

SQL

WHERE column_2 IS NOT NULL 
AND column_1 > 5

PySpark

df.where("column_2 IS NOT NULL and column_1 > 5")

As you’ll note above, both support SQL strings and native PySpark, so leveraging SQL syntax helps smooth the transition to PySpark. But, for readability and error-raising purposes, completely native PySpark should (probably) be the end goal.

JOIN

JOINs are another very underrated clause — if you can get really good at joins, the number of bugs in your code decreases dramatically. According to Spark: the Definitive Guide, there are 8 broad categories of joins, some of which include INNER and LEFT OUTER.

We won’t be covering each, but in general PySpark joins follow the below syntax:

<LEFT>.join(<RIGHT>, <JOIN_EXPRESSION>, <JOIN_TYPE>)
  • <LEFT> and <RIGHT> are PySpark DataFrames
  • <JOIN_EXPRESSION> is boolean comparison between columns in the two DataFrames
  • <JOIN_TYPE> is a string which determines the join type

SQL

FROM table_1
INNER JOIN table_2
ON table_1.x = table_2.y

PySpark

table_1.join(table_2, table_1["x"] == table_2["y"], "inner")

Tip: use <DF>.dropDuplicates().count() == <DF>.count()to check if you have duplicates in the left, right, or joined tables. These errors are sometimes hard to spot of you’re not looking for them.

GROUP BY

Moving on to the more complex SQL concept of grouping, PySpark has very similar syntax to pandas in this realm.

SQL

SELECT
column_1,
SUM(column_3) AS col_3_sum
FROM df
GROUP BY 1

PySpark

import pypspark.sql.functions as F

df.groupBy("column_1").agg(F.sum("column_3").alias("col_3_sum"))

There are many different ways to group data in PySpark, however the most versatile syntax is the above. We leverage .agg() and pass many positional arguments that define how we transform the columns. Note that we can chain .alias() to rename our column to something more usable than sum(column_3).

If you memorize this syntax you’ll always be able to make any transformation you want. To be perfectly clear, the syntax is…

df.groupBy(['<col_1>','<col_2>',...]).agg(
F.<agg_func>('<col_3>').alias('<name_3>'),
F.<agg_func>('<col_4>').alias('<name_4>'),
...
)

For a list of aggregation functions and examples of each, check out sparkbyexamples.

Conclusion

Here we covered the very basics of moving from SQL to PySpark. With the above structure and some help from google, you can write pretty much any SQL query in native PySpark.

Note that there are lots of SELECT statement keywords, such as CASE, COALESCE, or NVL, all of which can be written using df.selectExpr(). If you want to to move to native PySpark, they’re very straightforward to google.

Hope this helps and good luck!

Thanks for reading! I’ll be writing 13 more posts that bring academic research to the DS industry. Check out my comment for links to the main source for this post and some useful resources.

--

--