SparkのTalend Data Mapperとの使用 - 7.3

EnrichVersion
7.3
EnrichProdName
Talend Big Data Platform
Talend Data Fabric
Talend Data Management Platform
Talend Data Services Platform
Talend MDM Platform
Talend Real-Time Big Data Platform
task
ジョブデザインと開発 > サードパーティーシステム > 変換処理コンポーネント > データマッピング
データガバナンス > サードパーティーシステム > 変換処理コンポーネント > データマッピング
データクオリティとプレパレーション > サードパーティーシステム > 変換処理コンポーネント > データマッピング

Talend Data MapperでSparkを使用

Apache Spark (別称「Spark」)は、Talend Data Mapperで大型の入力ファイルを扱う場合に便利です。Sparkでは変換作業の前にファイル全体をメモリにロードせずにファイルをストリームしてマッピングを処理するため、そのスピードを十分に活用できます。

大型の入力ファイルのインポート時にSparkとTalend Data Mapperの機能を一緒にテストする場合は、いかに簡単にそのテストを実行できるかこのシナリオでわかります。

Apache Sparkの詳細は、http://spark.apache.org/で公式ドキュメンテーションをご覧ください。Talend Data Mapperの詳細は、Talend Data Mapperユーザーガイドを参照してください。

前提条件

Talend Studioには、ジョブを実行できるローカルのSpark環境が含まれています。次のシナリオを正しく実行できるよう、設定可能な環境の例を示します。

  • Hadoop Distributed File System (HDFS)とSparkサービスが有効になっており、ClouderaがクラスターとしてインストールされているGoogle Cloud PlatformのCentOSサーバーの3つのインスタンス
  • Windows 10クライアント

Hadoopクラスターに接続

Hadoopクラスターへの接続を作成します。

手順

  1. Talend Studioで、[Repository] (リポジトリー) > [Metadata] (メタデータ)に移動します。
  2. [Hadoop Cluster] (Hadoopクラスター)を右クリックし、[Create Hadoop Cluster] (Hadoopクラスターの作成)を選択します。
  3. クラスターの名前を入力して[Next] (次へ)をクリックします。
  4. ディストリビューション(この例ではCloudera)を選択し、バージョンを選択します。
  5. [Retrieve configuration from Ambari or Cloudera] (AmbariまたはClouderaから設定を取得)を選択し、[Next] (次へ)をクリックします。
  6. Cloudera Managerの認証情報を入力します。
  7. [Connect] (接続)をクリックして[Discovered clusters] (発見されたクラスター)セクションを設定します。サービスはこのセクションから取得できます。
  8. [Next] (次へ)をクリックします。
  9. [Check Services] (サービスの確認)をクリックし、サービスがすべて有効になっているかどうか検証します。次にサービスステータスを確認します。

Talend Data Mapperストラクチャーを作成

マップ用にストラクチャーを作成します。

始める前に

入力として使うCSVファイルが必要です。このファイルは次のサンプルを使えば作成できます。
firstName,lastName,age
John,Doe,20
Jane,Doe,35
Kid,Doe,02

手順

  1. Mappingパースペクティブに切り替えて、[Data Mapper] > [Hierarchical Mapper] (階層マッパー)に移動します。
  2. [Structures] (ストラクチャー)を右クリックし、[New] (新規) > [Structure] (ストラクチャー)を選択します。
  3. [Import a structure definition] (構造定義をインポート)を選択して[Next] (次へ)をクリックします。
  4. [CSV]を選択して[Next] (次へ)をクリックします。
  5. 使用するサンプルファイルを選択して[Next] (次へ)をクリックします。
  6. ストラクチャーの名前を入力して[Next] (次へ)をクリックします。
  7. CSVのプロパティで[Skip header reading] (ヘッダーの読み取りをスキップ)チェックボックスを選択し、[Next] (次へ)[Finish] (終了)の順にクリックします。

ビッグデータバッチジョブを作成

Hadoopクラスターとストラクチャーを作成した後は、tHDFSConfigurationtHMapInputtLogRowという3つのコンポーネントを含んでいるビッグデータバッチジョブをデザインします。

手順

  1. Integrationパースペクティブを開き、[Repository] (リポジトリー) > [Job Designs] (ジョブデザイン)に移動します。
  2. [Big Data Batch] (ビッグデータバッチ)を右クリックし、[Create Big Data Batch Job] (ビッグデータバッチジョブの作成)を選択します。
  3. ジョブの作成に必要となる詳細を入力します。
  4. 作成したHadoopクラスターメタデータをジョブデザインにドラッグし、tHDFSConfigurationコンポーネントを選択します。
  5. tHMapInputtLogRowを追加し、[Row] (行) > [Main] (メイン)接続を使ってこれらのコンポーネントを接続します。
    1. 出力名を入力するよう求められたら、Outputと入力します。
  6. tLogRowをダブルクリックしてそのスキーマを定義します:
    1. [Edit schema] (スキーマの編集)の横にある[...]ボタンをクリックします。
    2. [Output (Input)] (出力(入力))セクションで、[ + ]をクリックしてカラムを3つ追加してそれぞれfirstNamelastNameageという名前を付けます。
    3. ボタンをクリックし、この3つのカラムを[tLogRow_1 (Output)] (tLogRow_1 (出力))にコピーします。
  7. tHMapInputをクリックし、[Basic Settings] (基本設定)タブを開きます。
    1. [Define a storage configuration component] (ストレージ設定コンポーネントを定義)チェックボックスをオンにし、tHDFSConfigurationコンポーネントを選択されたストレージとして選択します。
    2. [Input] (入力)フィールドで入力ファイルを指定します。
    3. [Configure Component] (コンポーネントの設定)の横にある[…]ボタンをクリックし、先ほど作成したストラクチャーを選択します。
    4. [Input Representation] (入力表記)ドロップダウンリストで[CSV]を選択します。
    5. [Next] (]次へ)をクリックして[Sample File] (サンプルファイル)フィールドに入力ファイルを追加し、[Run] (実行)をクリックして検出されたレコード数を確認します。
    6. [Finish] (終了)をクリックします。

マップを設定してジョブを実行

入力から出力ストラクチャーにエレメントをマッピングし、ジョブを実行します。

手順

  1. 入力rowエレメントを出力OutputRecordエレメントにドラッグします。
    エレメントはすべて自動的にマッピングされます。
  2. [Test Run] (テスト実行)をクリックして出力をプレビューします。
    この例では、JSONとして表示させると次のようになります:
    {
      "Output" : {
        "OutputRecord" : [ {
          "firstName" : "firstName",
          "lastName" : "lastName",
          "age" : "age"
        }, {
          "firstName" : "John",
          "lastName" : "Doe",
          "age" : "20"
        }, {
          "firstName" : "Jane",
          "lastName" : "Doe",
          "age" : "35"
        }, {
          "firstName" : "Kid",
          "lastName" : "Doe",
          "age" : "02"
        } ]
      }
    }
  3. マップを保存してビッグデータバッチジョブに戻ります。
  4. [Run] (実行)タブを開き、[Run] (実行)をクリックしてジョブを実行します。

トラブルシューティング

サンプルシナリオの実行中にエラーが発生した場合は、ジョブが正しく実行できるソリューションをいくつか参照してください。

  • Clouderaの不正な設定: Clouderaでは、クラスターが同製品の内部用FQDN (完全修飾ドメイン名)で設定されてしまうことがあります。これが該当する場合は、接続エラーを回避できるよう、お使いのホストファイルに追加操作を行う必要があります。

    それを実行するには、C:\\Windows\System32\drivers\etcに移動し、管理者としてホストファイルを開きます。次に、お使いのクラスターの外部IPアドレスと内部FQDNを追加します。ファイルを保存します。

    以上の操作によって、Clouderaが内部FQDNを使用するようになります。

  • ビッグデータバッチジョブに共通するエラー:Talend Studioとは異なるサーバーに存在するHadoopクラスターに接続している場合、次のエラーは無視して構いません。
    このエラーは、Sparkワーカーをローカルに実行するwinutilsを探すだけのものです。このエラーが発生しないようにするには、winutilsをダウンロードして抽出します。その抽出先をHadoopのホームディレクトリーとして設定します。