MAGAZINE

ルーターマガジン

AWS

MySQLに保存されたデータをスクリプト一発でAWS Athenaで集計する

2021.11.17
Pocket

こんにちは、学生アルバイトのkyonoです。

さてAWSにはAthenaという、標準SQLで簡単にS3内のデータを分析できるサービスがあります。 「大型データセットをすばやく、簡単に分析できる」というのが売りで、多くの場合数秒で結果が出てくるそうです1。 MySQLなどで大規模データを集計しようとすると、データが大きければ大きいほど時間がかかり、joinやorder byなどしようものなら更に酷いことになる。かと言って集計のためだけにインデックスを張るのも勿体ない気がする。 そんな時にAthenaの力を借りられれば、きっと快適な分析ライフが待っているはず……

しかし、MySQLに甘え甘やかされ尽くした私に待っていたのは、まさに艱難辛苦だったのです。

aws-sdk-athenaのセットアップ

単発でAthenaを使う場合には、AWSマネジメントコンソール上からGUIを使ってS3上のCSVからCREATE TABLEしてみたり、SELECT文を出したりすることが多いですが、今回は「MySQLからCSVを出力しAthena上のテーブル作成から一気に集計までスクリプトで自動化する」ことを目指すために、aws-sdk-athenaというgemを用いてRubyからAthenaを操作しています。

適当なクラス名でクラスを作り、その中でaws-sdk-athenaのクライアントをinitしましょう。 下のコードでは序でにs3のセットアップもしています。


class AWSOperator
    def initialize
    @bucket = 'bucket-name'
    @s3 = Aws::S3::Resource.new(
        profile: 'profile-name'
    )
    @athena = Aws::Athena::Client.new(
        profile: 'profile-name'
    )
    end
end

巷に溢れるブログでは、aws-sdkのinitの際に:access_key_idと:secret_access_keyを指定するやり方を書いているものも多いですが、gitに上げて公開したりすることを考えると、これらをハードコーディングするのは以ての外です。 私はこれで上司に怒られました。艱難辛苦のゴングが鳴り響きます。 それらについては環境変数でうまく隠すか、上記のように名前付きプロファイルで指定してあげるようにしましょう。

MySQLから必要なデータを取ってきてCSVダンプし、Athenaのテーブルを作成する

AthenaではS3にアップロードされているCSVやJSONなどからテーブルを作ることができます。 今回はCSV形式にしましょう。 ActiveRecordで必要なカラムだけをSELECTし、CSVにダンプします。 この時ActiveRecord::Baseのcolumns_hashメソッドを使ってカラム名とその型を記録しておきましょう。後で使います。


    column_to_type = DesignatedTable.columns_hash.map { |k,v| [k,v.type] }.to_h
    # => {"id"=>:integer, "name"=>:string, "created_at"=>:datetime, ...}

使うカラムだけ選んで保存しておいても良いでしょう。

そうしたらそれを元にCREATE TABLEしましょう。 aws-sdk-athenaのstart_query_executionメソッドでクエリを実行させることができます。 locationではCSVそのものではなく、CSVの入ったディレクトリのパスを指定します。 クエリの実行結果の詳細は対話的に返ってくるのではなく、output_locationにファイルとして保存されます。

query_execution_id = @athena.start_query_execution({
    query_string: " \
    create external table if not exists default.`table_name` ( \
        #{columns(column_to_type)} \
    ) \
    row format delimited \
    fields terminated by ',' \
    lines terminated by '\n' \
    location 's3://#{@bucket}/csv-data-directory/' \
    tblproperties ('skip.header.line.count'='1'); \
    ",
    result_configuration: {
    output_location: "s3://#{@bucket}/result_csv/"
    }
}).query_execution_id

さて、このソースコードではAthenaでのカラムをハードコーディングするのではなく、columnsというメソッドを用い、先程のカラムと型のハッシュを引数として展開しています。 その実装は下のようになっています。

def columns(column_to_type)
    sqltype_to_athenatype = {
    :binary => 'BOOLEAN',
    :boolean => 'BOOLEAN',
    :date => 'DATE',
    :datetime => 'TIMESTAMP',
    :decimal => 'DECIMAL',
    :float => 'FLOAT',
    :integer => 'INT',
    :string => 'STRING',
    :text => 'STRING',
    :time => 'STRING',
    :timestamp => 'TIMESTAMP'
    }
    retval = ''
    column_to_type.each do |k,v|
    retval += "`#{k}` #{sqltype_to_athenatype[v]},\n"
    end

    retval[0..-3]
end

MySQLのデータ型とAthenaのデータ型は異なり、またActiveRecordのそれも両者と異なっています。 MySQLのノリで書くと痛い目に遭います。私は遭いました。 23を見ながらうまく対応させましょう。

またActiveRecordのレコードからAthenaのクエリ文を生成する上でまた面倒なことがあります。 AthenaでのTIMESTAMPの形式はYYYY-MM-DD HH:MM:SS.fffffffffです。 MySQLをSequel Proなどで見た時、DATETIME型のカラムの形式はこれに合っているように見えますが、ActiveRecordでSELECTすると2021-05-23 10:43:50.000000000 +0900のようになります。 タイムゾーン表記が邪魔をして、Athena上で登録できず、エラーすら吐かないまま空白になってしまうので、gsubなどで取り除いておきましょう。
エラーを吐いてほしい。

start_query_executionで実行したら、AWSサーバ側で実行が完了するまで待ってやる必要があります。 そのために必要なのが最初のコードに出てきたquery_execution_idです。 これを引数に取って、get_query_executionでクエリの実行状態を監視します。 何か異常終了があれば、stop_query_executionでクエリを停止します。 get_query_executionの返り値の詳しい構造については4を参照してください。

def wait_for_completion(query_execution_id)
    loop do
    sleep 1
    status = @athena.get_query_execution({ query_execution_id: query_execution_id }).query_execution.status

    case status.state
        when 'FAILED', 'CANCELLED'
        STDERR.puts status.state_change_reason
        exit 1
        when 'SUCCEEDED'
        break
        when 'QUEUED', 'RUNNING'
        nil
        else
        STDERR.puts "Undefined status found"
        @athena.stop_query_execution({query_execution_id: query_execution_id})
        break
    end
    end
end

sleepはちゃんと入れましょう。あまり高頻度でリクエストしすぎると、Rate exceededというエラーが返ってきます。

これで無事にMySQLからSELECTしたデータをAthenaに移せました。

作成したテーブルを用いてデータを集計する

あとはSQLを叩くだけです。そう思っていました。
しかしMySQLにどっぷり浸かり続けた私に、厳格なシンタックスを持つAthenaの”SQL”が牙を剝いてきたのです。

例1

まずは下記のようなクエリを実行するとします。 これは店毎にすべての商品数と、先月の商品数、そして今月の商品数を取得しています。

select shop, count(*) total, 
        sum(created_at < DATE_FORMAT(CURDATE(), '%Y-%m-01') and created_at > DATE_FORMAT( ADDDATE( CURDATE() , INTERVAL -1 MONTH) , '%Y-%m-01' )) last_month,
        sum(created_at > DATE_FORMAT(CURDATE(), '%Y-%m-01')) this_month
from products 
where not shop = 'amazon' 
group by shop 
having not last_month = 0;

これはMySQLだと何の問題もなく動く文ですが、飽く迄もMySQLでの構文ですから、当然Athenaでの文法には合致しません。 取り敢えずCURDATE()をAthenaでのカウンターパートであるCURRENT_DATEに変更しましょう。 ADDDATEも使いません。

select shop, count(*) total,
        sum(created_at < DATE_FORMAT(CURRENT_DATE, '%Y-%m-01') and created_at > DATE_FORMAT( CURRENT_DATE - INTERVAL '1' MONTH , '%Y-%m-01' )) last_month,
        sum(created_at > DATE_FORMAT(CURRENT_DATE, '%Y-%m-01')) this_month
from products 
where not shop = 'amazon' 
group by shop 
having not last_month = 0;

これはエラーを起こします。 SYNTAX_ERROR: line 3:28: '<' cannot be applied to timestamp, varchar (AWSOperator::ExecutionError)と言われてしまいます。 DATE_FORMAT関数はvarchar型を返してくるので、timestamp型のcreated_atと比較はできないということですね。 それではCASTを用いてtimestampに変換してあげましょう。

select shop, count(*) total,
        sum(created_at < CAST(DATE_FORMAT(CURRENT_DATE, '%Y-%m-01') AS TIMESTAMP) and created_at > CAST(DATE_FORMAT( CURRENT_DATE - INTERVAL '1' MONTH , '%Y-%m-01') AS TIMESTAMP)) last_month,
        sum(created_at > CAST(DATE_FORMAT(CURRENT_DATE, '%Y-%m-01') AS TIMESTAMP)) this_month
from products 
where not shop = 'amazon' 
group by shop 
having not last_month = 0;

これもエラーを起こします。艱難辛苦の連続にそろそろやる気が削がれてきます。 SYNTAX_ERROR: line 3:12: Unexpected parameters (boolean) for function sum. Expected: sum(double) , sum(real) , sum(bigint) , sum(interval day to second) , sum(interval year to month) , sum(decimal(p,s)) (AWSOperator::ExecutionError)と言われてしまいます。 sumの中にbooleanが入っているのは駄目だと言うことです。 よく考えたら不等式を使ったらbooleanが返ってくるのは当たり前で、それをよしなに解釈してくれるのはMySQLの優しさであったのです。
なればcase whenを使って、trueなら1、falseなら0というintegerに変換してsumをとらせてあげましょう。

select shop, count(*) total,
        sum( case when
            created_at < CAST(DATE_FORMAT(CURRENT_DATE, '%Y-%m-01') AS TIMESTAMP) and created_at > CAST(DATE_FORMAT( CURRENT_DATE - INTERVAL '1' MONTH , '%Y-%m-01' ) AS TIMESTAMP)
            then 1 else 0 end
            ) last_month,
        ) last_month,
        sum( case when
            created_at > CAST(DATE_FORMAT(CURRENT_DATE, '%Y-%m-01') AS TIMESTAMP)
            then 1 else 0 end
        ) this_month
from products 
where not shop = 'amazon' 
group by shop 
having not last_month = 0;

SYNTAX_ERROR: line 12:56: Column 'last_month' cannot be resolved (AWSOperator::ExecutionError) はい。もう判っていました。
これはHAVINGの仕業です。普通のSQLではGROUP BYやHAVINGで指定する列にエイリアスを用いることはできません。 MySQLはこれが可能になっているので、このエラーに当たるまで私もこの仕様を知らず、当たり前のようにGROUP BYやHAVINGでエイリアスを指定していました。 MySQLというものは、まことに人類の堕落の象徴なのであります。
という訳で、HAVINGの中でもう一度sumの長い構文を書いてあげましょう。

select shop,
        count(*) total,
        sum( case when
            created_at < CAST(DATE_FORMAT(CURRENT_DATE, '%Y-%m-01') AS TIMESTAMP) and created_at > CAST(DATE_FORMAT( CURRENT_DATE - INTERVAL '1' MONTH , '%Y-%m-01' ) AS TIMESTAMP)
            then 1 else 0 end
        ) last_month,
        sum( case when
            created_at > CAST(DATE_FORMAT(CURRENT_DATE, '%Y-%m-01') AS TIMESTAMP)
            then 1 else 0 end
        ) this_month
from products
where not shop = 'amazon'
group by shop
having not sum( case when
                created_at < CAST(DATE_FORMAT(CURRENT_DATE, '%Y-%m-01') AS TIMESTAMP) and created_at > CAST(DATE_FORMAT( CURRENT_DATE - INTERVAL '1' MONTH , '%Y-%m-01' ) AS TIMESTAMP)
                then 1 else 0 end
            ) = 0;

これで漸く実行できます。長く苦しい道程でした。 SQLとは本来斯くも恐ろしいものなのです。

例2

以下のSQLでは、shop毎に先月のmodel_numberの種類数と、先月と今月のmodel_numberの種類数を取得し、その差を取ることで店毎に商品の種類が先月から何種類増加したのかを取得しています。

CREATE DEFINER=`admin`@`%` FUNCTION `new_models_count`(shop_name CHAR(255)) RETURNS int(11)
BEGIN
DECLARE last_month INT;
DECLARE without_dupe INT;
select count(distinct model_number) into last_month from products where shop = shop_name and created_at < DATE_FORMAT(CURDATE(), '%Y-%m-01') and created_at > DATE_FORMAT(ADDDATE(CURDATE(), INTERVAL -1 MONTH), '%Y-%m-01');
select count(distinct model_number) into without_dupe from products where shop = shop_name and created_at > DATE_FORMAT(ADDDATE(CURDATE(), INTERVAL -1 MONTH), '%Y-%m-01');
return(without_dupe - last_month);
END;

select shop, new_models_count(shop) from products where not shop = 'amazon' group by shop having min(created_at) < DATE_FORMAT(CURDATE(), '%Y-%m-01');

FUNCTIONを使った文です。これのお陰で本体のselect文が簡潔に書けています。 Athenaでもぜひ似たようなことをやりたいですが、残念ながらどう工夫してもこれはできません。
まずAthenaは複数の文を一遍に実行させることはできません。1リクエスト1文です。 またAthenaのユーザ定義関数はJavaを用いて書かないといけません5。 一応MySQLでも「ユーザ定義関数」と呼ばれるものはCやC++で書かないといけないようですが、Athenaだとちゃちゃっと宣言してぱぱっと実行できるようなラムダ式っぽい関数は見当たりません。 そしてJavaを一々書いてjarをビルドするのはとても面倒です。 せっかくRubyで操作するなら尚更です。

ではどうするのか。中間テーブルを作りながら地道にやっていきましょう。 艱難辛苦は避けられないものなのです。
先ずは上の文のlast_monthに当たるテーブルを作りましょう。 CREATE TABLE AS (CTAS)を用いると簡単です。

CREATE TABLE last_month
AS SELECT site, count(distinct code) as count_code
FROM default.products
WHERE created_at < CAST(DATE_FORMAT(CURRENT_DATE, '%Y-%m-01') AS TIMESTAMP)  AND created_at > CAST(DATE_FORMAT( CURRENT_DATE - INTERVAL '1' MONTH , '%Y-%m-01') AS TIMESTAMP)
GROUP BY site;

without_dupeも全く同じように作成できます。

CREATE TABLE without_dupe
AS SELECT site, count(distinct code) as count_code
FROM default.products
WHERE created_at > CAST(DATE_FORMAT( CURRENT_DATE - INTERVAL '1' MONTH , '%Y-%m-01') AS TIMESTAMP)
GROUP BY site;

そうしたらMySQLと同様にjoinしてあげましょう。

SELECT lm.site, wd.count_code - lm.count_code
FROM default.without_dupe wd
LEFT JOIN default.last_month lm
ON wd.site = lm.site;

中間テーブルは終わったらdrop tableしてあげると、次回の実行時に色々言われなくて済みます。

Pocket

CONTACT

お問い合わせ・ご依頼はこちらから