大型のルックアップテーブルのバッチ処理 - 7.3

データ統合ジョブの例

Version
7.3
Language
日本語
Product
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Real-Time Big Data Platform
Module
Talend Studio
Content
ジョブデザインと開発 > ジョブデザイン
Last publication date
2024-02-13

この記事では、大型のルックアップテーブルにバッチ処理を使う方法について説明しています。

この問題に対するアプローチの1つは、バッチ処理と呼ばれるプロセスを使うことです。バッチ処理により、レコードのバッチを1回の実行で処理することが可能になります。これが反復的に行われて、バッチがさらに作成され、すべてのレコードが処理されます。

各反復により、ソーステーブルとルックアップテーブルから固定数のレコードが処理および抽出され、結合が実行され、ターゲットテーブルがロードされます。

このようにして、プロセスがメモリ内に保持するレコード数(batchrec変数)を制御できます。

これには、[Context variables] (コンテキスト変数)tLoopコンポーネント、tContextLoadコンポーネント、tMapおよびtJavaコンポーネントを使用します。

手順

  1. コンテキスト変数を使用してジョブをダイナミックにします。[DEV] (開発)および[PROD] (本番)など、異なる環境のコンテキスト変数に異なる値を使用することができます。そうすることでジョブは柔軟になり、複数の環境にまたがってコードの変更を行う必要もありません。
    変数 説明
    Recnum この変数は実行の開始点として使用されます。このジョブにはデフォルトで"1"に設定されています。
    Batchrec この変数は、各反復でジョブに処理させるレコードの数として使用されます。この変数をダイナミックにすることで、各バッチで処理するレコード数を制御できます。
    MAX_CUSTID この変数は、最後の反復の後に処理を停止するために使用されます。これは最後のレコードを読み込んだ後になります。この変数は、tContextLoadコンポーネントを使用してロードされます。

    そして、SQL実行は次のとおりです: "SELECT 'MAX_CUSTID' as Key, max(id) as Value FROM `customer`.`cust` "

  2. sssql変数とlsql変数を使用して、tMysqlInputtMSSqlInputがデータベース上で実行するSQLをビルドします。ssqlCOLUMNSlsqlCOLUMNSで選択するカラムは、入力コンポーネントで定義したスキーマと同じである必要があります。

    たとえば顧客(tMysqlInput)コンポーネント内のクエリーは次のように定義されており、クエリー全体がダイナミックになっています。

    " SELECT " + context.ssqlCOLUMNS + context.ssqlFROM + context.ssqlWHERE + ">=" + context.Recnum + context.ssqlAND + "<" + (context.Recnum + context.Batchrec) ;

    類似するSQLクエリーがCUST_LOCATIONS (tMSSqlInput)コンポーネント上で定義されています。

  3. tLoopコンポーネントを次のように設定します。

    上記で定義したコンテキスト変数は、以下に示すようにtLoopコンポーネント内で使用されます。したがって、ソーステーブルとルックアップテーブルからレコードを取得するために、最大顧客IDに至るまで、各反復にBatchrec変数で定義されたバッチのレコード数が使用されます。

  4. tJavaコンポーネントを次のように設定します。

    このコンポーネントは、各バッチで処理されるレコードの数ごとに開始cust_idを増分するために使用されます。

  5. tMapコンポーネントを次のように設定します。

    このコンポーネントは結合条件に使用します。ルックアップテーブルが静的なため、[Lookup Model] (ルックアップモデル)[Load Once] (1回のロード)に設定します。ルックアップテーブルに重複があることは想定していないので、[Match Model] (一致モデル)[Unique match] (完全一致)に設定します。位置が見つからない場合でもソースデータがターゲットにロードされるようにするため、[Join Model] (結合モデル)[Left Outer Join] (左外部結合)に設定します。

  6. OnSubJobOkトリガーとOnComponentOkトリガーを使用してサブジョブを接続します。

    tLooptMysqlInputコンポーネントに直接接続しないでください。tJava ([Update record counter] (レコードカウンターの更新))をtMysqlOutput_1に直接接続しないでください。tLooptJavaはそれぞれのサブジョブの一部である必要があります。

    ジョブ全体は下図のように表示されます。

  7. ジョブを実行する

タスクの結果

以下はサンプル実行からのログです。ソーステーブル(cust)には1億行、ルックアップテーブル(CUST_LOCATIONS)には7000万行あります。Batchrec="10000000".

ジョブの実行には66分かかり、実行サーバー上で利用可能なメモリの範囲内で実行されました。メモリ不足の例外の影響は受けませんでした。