Snowflake’s snowpark features gives you power to bring your data engineering computations closer to your data in Snowflake. This brings some benefits such as cost saving as snowpark do not need data to be copied out of the Snowflake.
Today in this tutorial we will learn how to create a simple GeoIP UDF using maxmind free database and SnowPark Python API.
Step 1: Download GeoIP maxmind binary files#
MaxMind’s GeoIP database are popular for IP address enrichment as it provides lot of information from IP address such as Continent, Country, Latitude, Longitude etc. Check out their Enterprise features here. Today we will be using their free geolocation databases which are updated twice weekly, every Tuesday and Friday.
Step2: Setup#
Lets create demo database, schema and stage. We will see why do we need to create stage for UDF later.
CREATE DATABASE SNOWPARK_DEMO;
CREATE SCHEMA SNOWPARK_DEMO.SNOWPARK_GEOIP;
CREATE STAGE IF NOT EXISTS geoip_stage;
Step 3: Upload the GeoIP data to Snowflake stage#
In previous step we created a Snowflake managed stage called geoip_stage
. In Snowflake there are three types of Internal Stage
Stage Type | Description | Access method |
---|---|---|
Table Stage | Default stage created for each table. | @%TABLE_NAME |
User Stage | Default stage created for each user. Cannot be accessed by other users. | @~ |
Internal Named Stage | The stage user can create and manage with multiple users. Commonly used for sharing files within teams etc. | @STAGE_NAME |
To put data from local system to Internal Named Stage
we need to use PUT
command
PUT file:///Path/to/your/local/file/GeoLite2-City.mmdb @geoip_stage auto_compress=false overwrite=true;
Lets confirm if we have successfully added file to stage
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>ls @geoip_stage;
+--------------------------------+----------+----------------------------------+------------------------------+
| name | size | md5 | last_modified |
|--------------------------------+----------+----------------------------------+------------------------------|
| geoip_stage/GeoLite2-City.mmdb | 58034816 | eab2dc525abf09eb35c9cb8dd2a221a6 | Fri, 5 Apr 2024 09:49:17 GMT |
+--------------------------------+----------+----------------------------------+------------------------------+
1 Row(s) produced. Time Elapsed: 0.248s
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>
Step 4: Deploy UDF#
We can deploy UDF using SnowSight UI or local development environment. In this tutorial we will deploy UDF using local development envionment
To deploy to your environment change values to your Snowflake accont in config.py
1snowflake_personal_conn_prop = {
2 "account": "xxxx.ap-northeast-1.aws",
3 "user": "USERNAME",
4 "role": "ROLE",
5 "password": "PASSWORD",
6 "database": "SNOWPARK_DEMO",
7 "schema": "SNOWPARK_GEOIP",
8 "warehouse": "MY_COMPUTE_WH",
9}
1from geoip2.database import Reader
2from snowflake.snowpark import Session
3from snowflake.snowpark.functions import udf
4from snowflake.snowpark.types import StringType
5from config import snowflake_personal_conn_prop
6import sys
7import geoip2
8
9session=Session.builder.configs(snowflake_personal_conn_prop).create()
10session.add_import('@geoip_stage/GeoLite2-City.mmdb')
11session.add_packages('geoip2')
12
13@udf(name="get_geoip_info",is_permanent=True, replace=True,stage_location="@geoip_stage")
14def get_geoip_info(ip_address: str, attribute: str) -> str:
15 #Get absolute path of binary database file in Snowflake stage
16 IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
17 import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
18 geoip_database= import_dir + 'GeoLite2-City.mmdb'
19
20 with Reader(geoip_database) as reader:
21 try:
22 response=reader.city(ip_address)
23 if attribute.lower() == "country":
24 return response.country.name
25 elif attribute.lower() == "city":
26 return str(response.city.name)
27 elif attribute.lower() == "postcode":
28 return response.postal.code
29 elif attribute.lower() == "lat_and_long":
30 lat = response.location.latitude
31 long = response.location.longitude
32 return f"{lat}, {long}"
33 else:
34 return "Attribute not recognized"
35 except geoip2.errors.AddressNotFoundError as e:
36 return None
37
38## Alternate way to register UDF using register function
39# session.udf.register(func=get_geoip_info,
40# name="get_geoip_info",
41# return_type= StringType(),
42# is_permanent=True,
43# replace=True,
44# stage_location="@geoip_files")
- In above UDF the argument we want to retrive is also passed. If you prefer to create seperate UDF for each argument like
geoip_city
,geoip_country
and so on, you will need to register each UDF differently. - Snowpark Supports two types of UDF
UDF Type | Description |
---|---|
Anonymous UDF | This UDF is persisted in current Snowflake Session. When the session is killed, UDF is lost |
Named UDF | This UDF is persisted in Snowflake storage hence stage_location="@geoip_stage" is necessary. This UDF can be used in subsequent sessions. |
Step 5: See it in action#
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP> SELECT '103.110.24.67' AS IP, GET_GEOIP_INFO('103.110.24.67', 'country') AS country, GET_GEOIP_INFO('103.110.24.67','lat_and_long') AS lat_and_long, GET_GEOIP_INFO('103.110.24.67','city') AS city;
+---------------+---------+-------------------+-------+
| IP | COUNTRY | LAT_AND_LONG | CITY |
|---------------+---------+-------------------+-------|
| 103.110.24.67 | Japan | 35.6893, 139.6899 | Tokyo |
+---------------+---------+-------------------+-------+
1 Row(s) produced. Time Elapsed: 1.523s
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>
Step 6: Conclusion#
Snowpark’s python API provides robust way to implement custom UDF. It also provides ability to read file from Snowflake’s internal stage like we read GeoLite2-City.mmdb
from internal stage. Using this UDF we can enrich IP address data or even anonymise IP address and get country information. This use case is detailed explained by Felipe Hoffa in Geolocation with BigQuery: De-identify 76 million IP addresses in 20 seconds