LakeFormation と GlueStudio で S3 を使った簡単データレイク環境構築
LakeFormation と GlueStudio を使ってデータレイク環境を構築する
データ配置先の作成
データレイク環境を作り、そこに溜め込みたいデータ配置環境を用意する
今回はデータ配置先としてS3を使う
- インプットデータ配置先:S3(dp-datalake-dev)
- 取り込みデータベース :S3(dp-output-dev)
LakeFormationの設定
作成したデータレイク環境へのアクセス制御として IAM があるが、IAM ではサービス単位でしかアクセス制御が行えないため
テーブルやデータベース単位での細かなアクセス制御や、ダッシュボードを通して Glue、GlueStudioで作成した各種環境を一覧的に見ることが出来る
IAM上でサービスに対してFullの権限が与えられていてもLakeFormation上で権限が無いと操作できないように制御可能
LakeFormation管理者権限の設定
LakeFormation上の管理者権限、データベースの作成削除からあらゆる権限操作が可能となる
CatalogDBの作成
作成したS3のデータ配置先をカタログDBとして登録
▼インプットデータDB
▼取り込み後データDB
カタログデータベースに対する権限設定
カタログデータベースに対してどの IAM、ロールがどのような権限でアクセス可能か定義する
IAMアクセス制御の解除
IAMによるデータベース等へのアクセスコントロールを解除
データレイクロケーションの登録
データ配置先となるS3を登録することで「AWSServiceRoleForLakeFormationDataAccess」ロールに対して適切な権限が設定される
GlueStudioでのETL設定
集約したデータファイルをETLで整形、フィルターを行い蓄積用データベースに集約するためのETLジョブを作成する
ここでは視覚的にわかりやすい「Visual with a blank canvas」でジョブを作成する
事前にインプット用S3に必要なデータを配置しておく
ETLジョブの作成
インプット用S3の特定のディレクトリ配下に配置されたCSVファイルを取り込むためのジョブを作成する
取り込むデータ形式に合わせて定義
出力先、データレイクとして蓄積するデータベースを指定
この場合出力先のS3をデータベースとして利用し、Athena でクエリ操作できるようにパーティションの設定も行える
実行ロールの設定
Glueジョブのジョブ名、実行ロールを指定
ロールはデフォルトのものを利用
設定したロールに対して LakeFromation上での権限も付与する
一回ではロールに対して database 権限がつかないため all table 権限と2回実行する
ジョブの実行
Glue Studio のジョブ画面から作成したジョブを実行すると自動で S3 にパーティション毎にデータが配置され
LakeFormation上にもテーブルが作成され、データの中身について Athena で確認できる
ETLによるデータの加工
例)特定のカラムデータを timestamp 型に修正し、新規カラムにコピーする
基本の処理に加え Transform Action を追加し、Pyspark スクリプトを記述できる
from awsglue.dynamicframe import DynamicFrame
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, length, substring, to_date
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, TimestampType, StringType
from pyspark.sql.window import Window
# DataFrameへ変換
data_df = dfc.select(list(dfc.keys())[0]).toDF()
# データ追加
data_df = data_df.withColumn('master_update_date', F.from_unixtime(F.unix_timestamp('update_date','yyyy/MM/dd HH:mm:ss')).cast('timestamp'))
data_df.show()
# DynamicFrameへ変換
output_df = DynamicFrame.fromDF(data_df, glueContext, 'output_df')
return DynamicFrameCollection({"CustomTransform0": output_df}, glueContext)
ここでは特定のカラムを timestamp 形式に変換し 新規カラムを追加してデータをコピーしている
カラムの追加は Outputschema から Edit で Add root culumm から追加する
スクリプトにて変更を加えたデータ構造を DynamicFrameCollection として Glue に返し
Transform Select From Collection にて全てをデータとして受け取り 出力先のS3に渡している