Skip to main content
  1. Blogs/

Getting Started With UDF Using Snowpark Python API

··751 words·4 mins· ·
Snowflake Python UDF GeoIP Data Engineering MaxMind
Aakash Nand
Author
Aakash Nand
Senior Data Engineer @ Slalom Build

Snowflake’s snowpark features gives you power to bring your data engineering computations closer to your data in Snowflake. This brings some benefits such as cost saving as snowpark do not need data to be copied out of the Snowflake.

Today in this tutorial we will learn how to create a simple GeoIP UDF using maxmind free database and SnowPark Python API.

Step 1: Download GeoIP maxmind binary files
#

MaxMind’s GeoIP database are popular for IP address enrichment as it provides lot of information from IP address such as Continent, Country, Latitude, Longitude etc. Check out their Enterprise features here. Today we will be using their free geolocation databases which are updated twice weekly, every Tuesday and Friday.

Step2: Setup
#

Lets create demo database, schema and stage. We will see why do we need to create stage for UDF later.

CREATE DATABASE SNOWPARK_DEMO;
CREATE SCHEMA SNOWPARK_DEMO.SNOWPARK_GEOIP;
CREATE STAGE IF NOT EXISTS geoip_stage;

Step 3: Upload the GeoIP data to Snowflake stage
#

In previous step we created a Snowflake managed stage called geoip_stage. In Snowflake there are three types of Internal Stage

Stage TypeDescriptionAccess method
Table StageDefault stage created for each table.@%TABLE_NAME
User StageDefault stage created for each user. Cannot be accessed by other users.@~
Internal Named StageThe stage user can create and manage with multiple users. Commonly used for sharing files within teams etc.@STAGE_NAME

To put data from local system to Internal Named Stage we need to use PUT command

PUT file:///Path/to/your/local/file/GeoLite2-City.mmdb @geoip_stage auto_compress=false overwrite=true;

Lets confirm if we have successfully added file to stage

aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>ls @geoip_stage;
+--------------------------------+----------+----------------------------------+------------------------------+
| name                           |     size | md5                              | last_modified                |
|--------------------------------+----------+----------------------------------+------------------------------|
| geoip_stage/GeoLite2-City.mmdb | 58034816 | eab2dc525abf09eb35c9cb8dd2a221a6 | Fri, 5 Apr 2024 09:49:17 GMT |
+--------------------------------+----------+----------------------------------+------------------------------+
1 Row(s) produced. Time Elapsed: 0.248s
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>

Step 4: Deploy UDF
#

image

We can deploy UDF using SnowSight UI or local development environment. In this tutorial we will deploy UDF using local development envionment

To deploy to your environment change values to your Snowflake accont in config.py

1snowflake_personal_conn_prop = {
2   "account": "xxxx.ap-northeast-1.aws",
3   "user": "USERNAME",
4   "role": "ROLE",
5   "password": "PASSWORD",
6   "database": "SNOWPARK_DEMO",
7   "schema": "SNOWPARK_GEOIP",
8   "warehouse": "MY_COMPUTE_WH",
9}
 1from geoip2.database import Reader
 2from snowflake.snowpark import Session
 3from snowflake.snowpark.functions import udf
 4from snowflake.snowpark.types import StringType
 5from config import snowflake_personal_conn_prop
 6import sys
 7import geoip2
 8
 9session=Session.builder.configs(snowflake_personal_conn_prop).create()
10session.add_import('@geoip_stage/GeoLite2-City.mmdb')
11session.add_packages('geoip2')
12
13@udf(name="get_geoip_info",is_permanent=True, replace=True,stage_location="@geoip_stage")
14def get_geoip_info(ip_address: str, attribute: str) -> str:
15    #Get absolute path of binary database file in Snowflake stage
16    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
17    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
18    geoip_database= import_dir + 'GeoLite2-City.mmdb'
19
20    with Reader(geoip_database) as reader:
21        try:    
22            response=reader.city(ip_address)
23            if attribute.lower() == "country":
24                return response.country.name
25            elif attribute.lower() == "city":
26                return str(response.city.name)
27            elif attribute.lower() == "postcode":
28                return response.postal.code
29            elif attribute.lower() == "lat_and_long":
30                lat = response.location.latitude
31                long = response.location.longitude
32                return f"{lat}, {long}"
33            else:
34                return "Attribute not recognized"
35        except geoip2.errors.AddressNotFoundError as e:
36            return None
37
38## Alternate way to register UDF using register function
39# session.udf.register(func=get_geoip_info, 
40#                      name="get_geoip_info",
41#                      return_type= StringType(),
42#                      is_permanent=True, 
43#                      replace=True,
44#                      stage_location="@geoip_files")
  • In above UDF the argument we want to retrive is also passed. If you prefer to create seperate UDF for each argument like geoip_city, geoip_country and so on, you will need to register each UDF differently.
  • Snowpark Supports two types of UDF
UDF TypeDescription
Anonymous UDFThis UDF is persisted in current Snowflake Session. When the session is killed, UDF is lost
Named UDFThis UDF is persisted in Snowflake storage hence stage_location="@geoip_stage" is necessary. This UDF can be used in subsequent sessions.

Step 5: See it in action
#

aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP> SELECT '103.110.24.67' AS IP, GET_GEOIP_INFO('103.110.24.67', 'country') AS country, GET_GEOIP_INFO('103.110.24.67','lat_and_long') AS lat_and_long, GET_GEOIP_INFO('103.110.24.67','city') AS city;
+---------------+---------+-------------------+-------+                         
| IP            | COUNTRY | LAT_AND_LONG      | CITY  |
|---------------+---------+-------------------+-------|
| 103.110.24.67 | Japan   | 35.6893, 139.6899 | Tokyo |
+---------------+---------+-------------------+-------+
1 Row(s) produced. Time Elapsed: 1.523s
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>

Step 6: Conclusion
#

Snowpark’s python API provides robust way to implement custom UDF. It also provides ability to read file from Snowflake’s internal stage like we read GeoLite2-City.mmdb from internal stage. Using this UDF we can enrich IP address data or even anonymise IP address and get country information. This use case is detailed explained by Felipe Hoffa in Geolocation with BigQuery: De-identify 76 million IP addresses in 20 seconds

Step 7: References
#