メインコンテンツへスキップ
  1. Blogs/

Snowpark Python API を使用した UDF の入門

··405 文字·2 分· ·
Snowflake Python UDF GeoIP Data Engineering MaxMind

SnowflakeのSnowPark機能により、データエンジニアリングの計算をSnowflake内のデータに近づけることができます。Snowpark は Snowflake からデータをコピーする必要がないため、コスト削減などの利点があります。

このチュートリアルでは、Maxmind 無料データベースと SnowPark Python API を使用して、シンプルな GeoIP UDF を作成する方法を学びましょう。

Step 1: GeoIP maxmind バイナリファイルのダウンロード
#

MaxMind の GeoIP データベースは、IP アドレスから大陸、国、緯度、経度など多くの情報を提供するため、IP アドレスからのデータエンリッチメントに人気があります。 エンタプライズバージョンの情報はここに書いてありますが、今日毎週火曜日と金曜日に更新している無料のgeolocationデータベースを使えます。

Step2: Snowflakeの準備
#

デモ・データベース、スキーマ、ステージを作成しよう。なぜUDFのためにステージを作成する必要があるのかは後で説明します。

CREATE DATABASE SNOWPARK_DEMO;
CREATE SCHEMA SNOWPARK_DEMO.SNOWPARK_GEOIP;
CREATE STAGE IF NOT EXISTS geoip_stage;

Step 3: GeoIP データベースファイルを Snowflake ステージにアップロードする
#

前のステップでは、geoip_stageというSnowflakeのマネージドステージを作成しました。Snowflakeには3種類の内部ステージがあります。

ステージ タイプ説明アクセス方法
テーブル ステージテーブル毎にSnowflakeは自動的に作成したステージ@%TABLE_NAME
ユーザ ステージ各ユーザーのデフォルトステージ。 他のユーザーがアクセスすることはできません。@~
インターナル ネムド ステージユーザーで作成と管理できるステージ。 チーム内でのファイル共有などによく使われる。@STAGE_NAME

ローカルシステムから インターナル ネムド ステージ にデータを入れるには、PUT コマンド使えます。

PUT file:///Path/to/your/local/file/GeoLite2-City.mmdb @geoip_stage auto_compress=false overwrite=true;

ステージへのファイルのアップロードに成功したかどうか確認してみましょう。

aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>ls @geoip_stage;
+--------------------------------+----------+----------------------------------+------------------------------+
| name                           |     size | md5                              | last_modified                |
|--------------------------------+----------+----------------------------------+------------------------------|
| geoip_stage/GeoLite2-City.mmdb | 58034816 | eab2dc525abf09eb35c9cb8dd2a221a6 | Fri, 5 Apr 2024 09:49:17 GMT |
+--------------------------------+----------+----------------------------------+------------------------------+
1 Row(s) produced. Time Elapsed: 0.248s
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>

Step 4: UDFをデプロイする
#

image

UDFはSnowSight UIまたはローカル開発環境を使用してデプロイできます。このチュートリアルでは、ローカル開発環境を使用してUDFをデプロイします。

自分の環境にデプロイするには、config.py で Snowflake アカウントの値を変更してください。

1snowflake_personal_conn_prop = {
2   "account": "xxxx.ap-northeast-1.aws",
3   "user": "USERNAME",
4   "role": "ROLE",
5   "password": "PASSWORD",
6   "database": "SNOWPARK_DEMO",
7   "schema": "SNOWPARK_GEOIP",
8   "warehouse": "MY_COMPUTE_WH",
9}
 1from geoip2.database import Reader
 2from snowflake.snowpark import Session
 3from snowflake.snowpark.functions import udf
 4from snowflake.snowpark.types import StringType
 5from config import snowflake_personal_conn_prop
 6import sys
 7import geoip2
 8
 9session=Session.builder.configs(snowflake_personal_conn_prop).create()
10session.add_import('@geoip_stage/GeoLite2-City.mmdb')
11session.add_packages('geoip2')
12
13@udf(name="get_geoip_info",is_permanent=True, replace=True,stage_location="@geoip_stage")
14def get_geoip_info(ip_address: str, attribute: str) -> str:
15    #Get absolute path of binary database file in Snowflake stage
16    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
17    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
18    geoip_database= import_dir + 'GeoLite2-City.mmdb'
19
20    with Reader(geoip_database) as reader:
21        try:    
22            response=reader.city(ip_address)
23            if attribute.lower() == "country":
24                return response.country.name
25            elif attribute.lower() == "city":
26                return str(response.city.name)
27            elif attribute.lower() == "postcode":
28                return response.postal.code
29            elif attribute.lower() == "lat_and_long":
30                lat = response.location.latitude
31                long = response.location.longitude
32                return f"{lat}, {long}"
33            else:
34                return "Attribute not recognized"
35        except geoip2.errors.AddressNotFoundError as e:
36            return None
37
38## Alternate way to register UDF using register function
39# session.udf.register(func=get_geoip_info, 
40#                      name="get_geoip_info",
41#                      return_type= StringType(),
42#                      is_permanent=True, 
43#                      replace=True,
44#                      stage_location="@geoip_files")
  • 上記のUDFでは、取得したい引数も渡される。もし geoip_citygeoip_country などの引数ごとに別々の UDF を作成したい場合は、それぞれ別の UDF を登録する必要があります。
  • Snowparkは2種類のUDFをサポートしています。
UDF TypeDescription
Anonymous UDFTこのUDFは現在のSnowflake Sessionに永続化されます。セッションが終了すると、UDFは失われます。
Named UDFこのUDFはSnowflakeストレージに永続化されるため、stage_location="@geoip_stage"が必要である。このUDFは以降のセッションでも使用できます。

Step 5: UDFを試してみる
#

aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP> SELECT '103.110.24.67' AS IP, GET_GEOIP_INFO('103.110.24.67', 'country') AS country, GET_GEOIP_INFO('103.110.24.67','lat_and_long') AS lat_and_long, GET_GEOIP_INFO('103.110.24.67','city') AS city;
+---------------+---------+-------------------+-------+                         
| IP            | COUNTRY | LAT_AND_LONG      | CITY  |
|---------------+---------+-------------------+-------|
| 103.110.24.67 | Japan   | 35.6893, 139.6899 | Tokyo |
+---------------+---------+-------------------+-------+
1 Row(s) produced. Time Elapsed: 1.523s
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>

Step 6: まとめ
#

Snowparkのpython APIはカスタムUDFを実装するための堅牢な方法を提供しています。また、内部ステージからGeoLite2-City.mmdbを読み込むように、Snowflakeの内部ステージからファイルを読み込む機能も提供しています。このUDFを使用すると、IPアドレスデータをリッチ化したり、IPアドレスを匿名化して国情報を取得することもできます。このユースケースについては、Felipe HoffaさんがGeolocation with BigQuery: De-identify 76 million IP addresses in 20 secondsで詳しく説明しています。

Step 7: 参考
#