ユースケース: 財務データを処理するパイプラインの作成 - Cloud

Talend Cloud Pipeline Designer入門ガイド

author
Talend Documentation Team
EnrichVersion
Cloud
EnrichProdName
Talend Cloud
task
ジョブデザインと開発 > パイプラインのデザイン
デプロイメント > デプロイ中 > パイプラインの実行
EnrichPlatform
Talend Pipeline Designer

階層化された財務データ(IBAN、アカウント、トランザクション情報など)をエンリッチ化してフィルタリングした後、実行された取引の合計金額を集計してカウントします。

手順

  1. [Pipelines] (パイプライン)ページで[ADD PIPELINE] (パイプラインの追加)をクリックします。新しいパイプラインが開きます。
  2. 上部にあるツールバーでパイプラインのデフォルト名の横にある鉛筆アイコンをクリックし、パイプラインにわかりやすい名前を付けます。

    例え

    Process Financial Data
  3. [ADD SOURCE] (ソースの追加)をクリックし、ソースデータを選択できるパネルを開きます。ここでは、前に作成したfinancial dataデータセットを使用します。
  4. データセットを選択し、[SELECT DATASET] (データセットの選択)をクリックしてパイプラインに追加します。
    データセットはソースとして追加され、すぐにプレビューできます。
  5. をクリックし、パイプラインにPythonプロセッサーを追加します。このプロセッサーは、入力データを処理してエンリッチ化するPythonコードをコピーするために使用されます。
  6. プロセッサーに意味のある名前を付けます。

    例え

    enrich with IBAN validation
  7. [Map type] (マップタイプ)として[MAP] (マップ)を選択します。これによりPythonプロセッサーでは、入力ごとに出力レコードが自動的に生成されます。

    [MAP] (マップ)および[FLATMAP] (フラットマップ)の違いの詳細は、Pythonプロセッサーのドキュメントを参照してください。

  8. [Python code] (Pythonコード)エリアに、以下のコードを入力します:
    import string;
    
    ## IBAN Validation function;
    ALPHA = {c: str(ord(c) % 55) for c in string.ascii_uppercase};
    def reverse_iban(iban): return iban[4:] + iban[:4];
    def check_iban(iban): return int(''.join(ALPHA.get(c, c) for c in reverse_iban(iban))) % 97 == 1;
    
    output = input;
    transaction = input['transaction'];
    this_account = transaction["this_account"];
    account_routing = this_account["account_routing"];
    account_iban = account_routing["address"].replace(" ", "");
    output['iban_valid'] = check_iban(account_iban)
    このコードによって以下の処理が可能になります。
    • IBAN構文の有効性を確認する

    • 既存のレコードにiban_validという名前の新しいフィールドを追加し、IBANチェックの結果に応じて、値をtrueまたはfalseにする

  9. [Save] (保存)をクリックして設定を保存します。
    入力データが処理されます。変更はプレビューできます。
  10. をクリックして、パイプラインにFilterプロセッサーを追加します。このプロセッサーは、承認されたトランザクション(ACでタグ付け)を拒否されたトランザクション(DCでタグ付け)から分離するために使用されます。
  11. プロセッサーに意味のある名前を付けます。

    例え

    filter on accepted transactions
  12. [Filter] (フィルター)エリアで次の操作を行います。
    1. [NEW ELEMENT] (新しいエレメント)をクリックしてフィルターを追加します。
    2. [Input] (入力)リストで.transaction.details.typeを選択します。この値に基づいて顧客をフィルタリングします。
    3. レコードのフィルタリングで関数を適用しない場合は、[Apply a function first] (最初に関数を適用)リストでNONEを選択します。
    4. [Operator] (演算子)リストから[= =]を選択し、承認されたトランザクションをフィルタリングするため、[Value] (値)リストにACと入力します。

      このエリアではavpath構文を使用できます。詳細は、avpathとは、そして使用する理由とはを参照してください。

    5. [Save] (保存)をクリックして設定を保存します。
    入力データが処理されます。変更はプレビューできます。
  13. をクリックして、パイプラインにAggregateプロセッサーを追加します。このプロセッサーは、トランザクションをグループ化し、トランザクションの合計金額を計算するために使用します。
  14. プロセッサーに意味のある名前を付けます。

    例え

    count transaction amounts with valid IBAN
  15. [GROUP BY] (分類基準)エリアで、[NEW ELEMENT] (新しいエレメント)をクリックして集計セットに使用するフィールドを指定します。
    1. [Field path] (フィールドパス)リストで.transaction.details.descriptionを選択します。
    2. 新しいエレメントを追加し、このリストで.iban_validを選択します。
  16. [OPERATIONS] (操作)エリアで、[NEW ELEMENT] (新しいエレメント)をクリックして集計操作を追加します。
    1. [Field path] (フィールドパス)リストで.transaction.details.value.amountを、[Operation] (操作)リストで[Sum] (合計)を選択します。
    2. 生成されたフィールドに名前(total_amountなど)を付けます。
  17. [Save] (保存)をクリックして設定を保存します。
  18. パイプラインの[ADD DESTINATION] (デスティネーションの追加)アイテムをクリックしてパネルを開き、出力データ用に以前作成した財務データデータセットを選択します。テストデータセットはソースとデスティネーションで異なる動作をするため、データセットは入力と出力で同じものを使用できます。デスティネーションで使用する場合データは無視されます。
  19. デスティネーションに意味のある名前を付けます。

    例え

    processed data (out)
  20. [Save] (保存)をクリックして設定を保存します。
  21. (オプション) Aggregateプロセッサーをクリックし、フィルタリングとグループ化によって計算されたデータをプレビューします。