SnowflakeのSnowPark機能により、データエンジニアリングの計算をSnowflake内のデータに近づけることができます。Snowpark は Snowflake からデータをコピーする必要がないため、コスト削減などの利点があります。
このチュートリアルでは、Maxmind 無料データベースと SnowPark Python API を使用して、シンプルな GeoIP UDF を作成する方法を学びましょう。
Step 1: GeoIP maxmind バイナリファイルのダウンロード#
MaxMind の GeoIP データベースは、IP アドレスから大陸、国、緯度、経度など多くの情報を提供するため、IP アドレスからのデータエンリッチメントに人気があります。 エンタプライズバージョンの情報はここに書いてありますが、今日毎週火曜日と金曜日に更新している無料のgeolocationデータベースを使えます。
Step2: Snowflakeの準備#
デモ・データベース、スキーマ、ステージを作成しよう。なぜUDFのためにステージを作成する必要があるのかは後で説明します。
CREATE DATABASE SNOWPARK_DEMO;
CREATE SCHEMA SNOWPARK_DEMO.SNOWPARK_GEOIP;
CREATE STAGE IF NOT EXISTS geoip_stage;
Step 3: GeoIP データベースファイルを Snowflake ステージにアップロードする#
前のステップでは、geoip_stage
というSnowflakeのマネージドステージを作成しました。Snowflakeには3種類の内部ステージがあります。
ステージ タイプ | 説明 | アクセス方法 |
---|---|---|
テーブル ステージ | テーブル毎にSnowflakeは自動的に作成したステージ | @%TABLE_NAME |
ユーザ ステージ | 各ユーザーのデフォルトステージ。 他のユーザーがアクセスすることはできません。 | @~ |
インターナル ネムド ステージ | ユーザーで作成と管理できるステージ。 チーム内でのファイル共有などによく使われる。 | @STAGE_NAME |
ローカルシステムから インターナル ネムド ステージ
にデータを入れるには、PUT
コマンド使えます。
PUT file:///Path/to/your/local/file/GeoLite2-City.mmdb @geoip_stage auto_compress=false overwrite=true;
ステージへのファイルのアップロードに成功したかどうか確認してみましょう。
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>ls @geoip_stage;
+--------------------------------+----------+----------------------------------+------------------------------+
| name | size | md5 | last_modified |
|--------------------------------+----------+----------------------------------+------------------------------|
| geoip_stage/GeoLite2-City.mmdb | 58034816 | eab2dc525abf09eb35c9cb8dd2a221a6 | Fri, 5 Apr 2024 09:49:17 GMT |
+--------------------------------+----------+----------------------------------+------------------------------+
1 Row(s) produced. Time Elapsed: 0.248s
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>
Step 4: UDFをデプロイする#
UDFはSnowSight UIまたはローカル開発環境を使用してデプロイできます。このチュートリアルでは、ローカル開発環境を使用してUDFをデプロイします。
自分の環境にデプロイするには、config.py
で Snowflake アカウントの値を変更してください。
1snowflake_personal_conn_prop = {
2 "account": "xxxx.ap-northeast-1.aws",
3 "user": "USERNAME",
4 "role": "ROLE",
5 "password": "PASSWORD",
6 "database": "SNOWPARK_DEMO",
7 "schema": "SNOWPARK_GEOIP",
8 "warehouse": "MY_COMPUTE_WH",
9}
1from geoip2.database import Reader
2from snowflake.snowpark import Session
3from snowflake.snowpark.functions import udf
4from snowflake.snowpark.types import StringType
5from config import snowflake_personal_conn_prop
6import sys
7import geoip2
8
9session=Session.builder.configs(snowflake_personal_conn_prop).create()
10session.add_import('@geoip_stage/GeoLite2-City.mmdb')
11session.add_packages('geoip2')
12
13@udf(name="get_geoip_info",is_permanent=True, replace=True,stage_location="@geoip_stage")
14def get_geoip_info(ip_address: str, attribute: str) -> str:
15 #Get absolute path of binary database file in Snowflake stage
16 IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
17 import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
18 geoip_database= import_dir + 'GeoLite2-City.mmdb'
19
20 with Reader(geoip_database) as reader:
21 try:
22 response=reader.city(ip_address)
23 if attribute.lower() == "country":
24 return response.country.name
25 elif attribute.lower() == "city":
26 return str(response.city.name)
27 elif attribute.lower() == "postcode":
28 return response.postal.code
29 elif attribute.lower() == "lat_and_long":
30 lat = response.location.latitude
31 long = response.location.longitude
32 return f"{lat}, {long}"
33 else:
34 return "Attribute not recognized"
35 except geoip2.errors.AddressNotFoundError as e:
36 return None
37
38## Alternate way to register UDF using register function
39# session.udf.register(func=get_geoip_info,
40# name="get_geoip_info",
41# return_type= StringType(),
42# is_permanent=True,
43# replace=True,
44# stage_location="@geoip_files")
- 上記のUDFでは、取得したい引数も渡される。もし
geoip_city
、geoip_country
などの引数ごとに別々の UDF を作成したい場合は、それぞれ別の UDF を登録する必要があります。 - Snowparkは2種類のUDFをサポートしています。
UDF Type | Description |
---|---|
Anonymous UDF | TこのUDFは現在のSnowflake Sessionに永続化されます。セッションが終了すると、UDFは失われます。 |
Named UDF | このUDFはSnowflakeストレージに永続化されるため、stage_location="@geoip_stage" が必要である。このUDFは以降のセッションでも使用できます。 |
Step 5: UDFを試してみる#
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP> SELECT '103.110.24.67' AS IP, GET_GEOIP_INFO('103.110.24.67', 'country') AS country, GET_GEOIP_INFO('103.110.24.67','lat_and_long') AS lat_and_long, GET_GEOIP_INFO('103.110.24.67','city') AS city;
+---------------+---------+-------------------+-------+
| IP | COUNTRY | LAT_AND_LONG | CITY |
|---------------+---------+-------------------+-------|
| 103.110.24.67 | Japan | 35.6893, 139.6899 | Tokyo |
+---------------+---------+-------------------+-------+
1 Row(s) produced. Time Elapsed: 1.523s
aakash.nand#MY_COMPUTE_WH@AAKASH.SNOWPARK_GEOIP>
Step 6: まとめ#
Snowparkのpython APIはカスタムUDFを実装するための堅牢な方法を提供しています。また、内部ステージからGeoLite2-City.mmdb
を読み込むように、Snowflakeの内部ステージからファイルを読み込む機能も提供しています。このUDFを使用すると、IPアドレスデータをリッチ化したり、IPアドレスを匿名化して国情報を取得することもできます。このユースケースについては、Felipe HoffaさんがGeolocation with BigQuery: De-identify 76 million IP addresses in 20 secondsで詳しく説明しています。