Apache Spark BatchのtDeltaLakeOutputプロパティ - Cloud - 8.0

Delta Lake

Version
Cloud
8.0
Language
日本語
Product
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Real-Time Big Data Platform
Module
Talend Studio
Content
ジョブデザインと開発 > サードパーティーシステム > テクニカルコンポーネント > Delta Lake components
データガバナンス > サードパーティーシステム > テクニカルコンポーネント > Delta Lake components
データクオリティとプレパレーション > サードパーティーシステム > テクニカルコンポーネント > Delta Lake components
Last publication date
2024-02-28

これらのプロパティは、Spark Batchジョブのフレームワークで実行されているtDeltaLakeOutputを設定するために使われます。

Spark BatchtDeltaLakeOutputコンポーネントは、テクニカルファミリーに属しています。

このフレームワークのコンポーネントは、すべてのサブスクリプションベースのビッグデータ対応のTalend製品およびTalend Data Fabricで使用できます。

基本設定

データセットの保存方法を定義

[Metastore] (メタストア)[Files] (ファイル)、またはMergeのいずれか。

 

[Metastore] (メタストア):データをテーブル形式でメタストアに保存します。

 

[Files] (ファイル):データをデルタ形式でファイルに保存します。

 

[Merge] (マージ):データを既存のDeltaテーブルにマージして保存します。

Deltaテーブルのデータをマージする方法は、Databricksのドキュメントのmergeを使用したテーブルへのアップサートをご覧ください。

[Define a storage configuration component] (ストレージ設定コンポーネントを定義)

ターゲットファイルシステムへの接続に関する設定情報の提供で使用する設定コンポーネントを選択します。

このチェックボックスをオフにすると、ターゲットファイルシステムはローカルシステムになります。

使用する接続設定は同じジョブ内にあることが必要です。たとえばジョブにtS3Configurationコンポーネントをドロップした場合は、それを選択し、指定したS3ストレージシステムに結果を書き込めるようになります。

このフィールドは、[Basic settings] (基本設定)ビューで[Define the source of the dataset] (データセットのソースを定義する)ドロップダウンリストから[Files] (ファイル)を選択した場合のみ利用できます。

[Property type] (プロパティタイプ)

[Built-in] (組み込み)[Repository] (リポジトリー)のいずれかです。

 

[Built-In] (組み込み): 一元的に保存されるプロパティデータはありません。

 

[Repository] (リポジトリー): プロパティが保存されているリポジトリーファイルを選択します。

プロパティは、リポジトリーツリーのHadoopクラスターノードに一元的に保存されます。

後続するフィールドは、取得されたデータを使用して事前に入力されます。

Hadoopクラスターノードの詳細は、Hadoop接続メタデータを管理をご覧ください。

[Schema] (スキーマ)[Edit schema] (スキーマを編集)

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

スキーマを変更するには[Edit schema] (スキーマを編集)をクリックします。現在のスキーマがリポジトリータイプの場合は、3つのオプションを利用できます。

  • [View schema] (スキーマを表示): スキーマのみを表示する場合は、このオプションを選択します。

  • [Change to built-in property] (組み込みのプロパティに変更): ローカルで変更を行うためにスキーマを組み込みに変更する場合は、このオプションを選択します。

  • [Update repository connection] (リポジトリー接続をアップデート): リポジトリーに保存されているスキーマに変更を加え、変更後にそのコンテンツをすべてのジョブにプロパゲートするかどうかを決める場合は、このオプションを選択します。

    変更を現在のジョブにのみ反映する場合は、変更後、[No] (いいえ)を選択し、[Repository Content] (リポジトリーコンテンツ)ウィンドウで再びこのスキーマのメタデータを選択します。

Sparkは、PARQUETスキーマ内のカラムのデータ型を自動的に推測します。Apache SparkのTalendジョブでは、日付型がint96として推測され、保管されます。

このコンポーネントは、ダイナミックスキーマ機能の利点を備えているので、ソースファイルから不明なカラムを取得したり、各カラムを個別にマッピングしなくてもソースからカラムを一括してコピーしたりできます。ダイナミックスキーマの詳細は、ダイナミックスキーマをご覧ください。

ダイナミックスキーマ機能は、テーブルの不明なカラムを取得するしくみになっており、その他の目的には使用しないことをお勧めします。テーブルの作成には推奨しません。

 

[Built-in] (組み込み): そのコンポーネントに対してのみスキーマを作成し、ローカルに保管します。

 

[Repository] (リポジトリー): スキーマは作成済みで、リポジトリーに保管されています。さまざまなプロジェクトやジョブデザインで再利用できます。

[Folder/File] (フォルダー/ファイル)

ファイルシステムで使用するデータを参照するか、パスを入力します。

設定したパスがフォルダーを指す場合、このコンポーネントによりフォルダーに保管されているすべてのファイル(/user/talend/inなど)が読み取られます。サブフォルダーが存在する場合、[Spark configuration] (Spark設定)タブの[Advanced properties] (詳細プロパティ)テーブルでプロパティspark.hadoop.mapreduce.input.fileinputformat.input.dir.recursivetrueに設定しない限り、サブフォルダーは自動的に無視されます。
  • 使用するファイルシステムに応じて、ジョブに配置された対応する設定コンポーネント(S3用のtS3Configuration、Azure StorageおよびAzure Data Lake Storage用のtAzureFSConfigurationなど)を適切に設定します。

参照用のボタンはSpark Localモードでは機能しません。お使いのディストリビューションで、Talend Studioがサポートしているその他のSpark Yarnモードを使用している場合は、同じジョブ内の設定コンポーネントで接続を適切に設定したことを確認する必要があります。使用されるファイルシステムに応じて設定コンポーネントを使用します。

このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットの保存方法を選択)ドロップダウンリストから[Files] (ファイル)を選択した場合のみ利用できます。

[Action] (アクション)

ジョブの設定コンポーネントが接続情報を提供するファイルシステムにデータを書き込むための操作を選択します。
  • [Create] (作成): [Folder/File] (フォルダー/ファイル)フィールドで指定された[Target table] (ターゲットテーブル)またはディレクトリーを作成し、そこにデータを書き込みます。
  • [Append] (追加): [Target table] (ターゲットテーブル)または[Folder/File] (フォルダ/ファイル)フィールドで指定されたディレクトリーに既に存在するデータに受信レコードを追加します。
  • [Overwrite] (上書き):受信データを使用して、[Folder/File] (フォルダー/ファイル)フィールドで指定された[Target table] (ターゲットテーブル)またはディレクトリーを上書きします。
  • [Drop table] (ドロップテーブル):テーブルとそのテーブルに関連付けられているディレクトリーをファイルシステムから削除し、テーブルを再作成します。
  • [Drop table if exists] (存在する場合はテーブルを削除):テーブルとそのテーブルに関連付けられているディレクトリーをファイルシステムから削除し、テーブルを再作成します。テーブルが存在しない場合、何も起こりません。
  • [Truncate table] (テーブルを切り詰め):テーブル内のすべての行を削除しますが、テーブルのスキーマは残ります。

Delta Lakeは、ファイルのアップロード時間とこのファイルのメタデータタイムスタンプとの間にわずかな違いを体系的に作成します。データをフィルタリングする必要がある場合は、この違いに留意してください。

このオプションは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットの保存方法を選択)ドロップダウンリストから[Files] (ファイル)または[Metastore] (メタストア)を選択した場合のみ利用できます。

[Source type] (ソースタイプ)

入力データのソースタイプは、[Dataset] (データセット)または[SQL]のいずれかです。

このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットの保存方法を選択)ドロップダウンリストから[Merge] (マージ)を選択した場合のみ利用できます。

 

[Dataset] (データセット):ソースデータがデータフローからのデータセットである場合は、この値を選択します。

 

SQL:SQLステートメントを使用してデルタテーブルからソースデータを取得する場合は、この値を選択します。

[SQL]フィールドに、取得するソース データを定義するSQLスタートメントを入力します。
注: データの取得元であるDeltaテーブルは、メタストアから、または入力に接続されているコンポーネントから取得できます。
[Table alias] (テーブルエイリアス)フィールドに、ソースデータを含むテーブルのエイリアスを入力します。エイリアスを二重引用符で囲みます。
注: テーブルエイリアスは、SQLクエリーで使用される[SELECT] (セレクト)ステートメントの結果であるテーブルの名前です。

[Database] (データベース)

使うDelta Lakeデータベースの名前を二重引用符で囲んで入力します。

このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットの保存方法を選択)ドロップダウンリストから[Metastore] (メタストア)および[Merge] (マージ)を選択した場合のみ利用できます。

[Target table] (ターゲットテーブル)

使うテーブルの名前を二重引用符で囲んで入力します。

このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットの保存方法を選択)ドロップダウンリストから[Metastore] (メタストア)および[Merge] (マージ)を選択した場合のみ利用できます。

[External paths] (外部パス)

データを格納するために、DBFSとは異なるファイルシステムを指すパスを二重引用符で囲んで入力します。ADLSGen2ファイルシステムまたはS3ファイルシステムのいずれかです。

このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットの保存方法を選択)ドロップダウンリストから[Metastore] (メタストア)を選択した場合のみ利用できます。

[Optimize] (最適化)

SQLステートメントを入力して、データのレイアウトを最適化します。

Delta Lakeデータの最適化の詳細は、DatabricksドキュメントのOptimize(Delta Lake on Databricks)をご覧ください

このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットの保存方法を選択)ドロップダウンリストから[Metastore] (メタストア)を選択した場合のみ利用できます。

[Merge ON] (マージON)

マージ操作を適用する入力カラムと出力カラムを示します。入力カラムと出力カラムのペアごとに、ファンクションを指定する必要があります。結果の句は、実行するマージアクションを条件付けます。これらのマージアクションは、[When matched] (一致する場合)および[When not matched] (一致しない場合)オプションで定義されます。

たとえば、航空会社がフライトイベントに関連するデータを処理するとします。彼らは定期的にフライトの出発日の変更を処理し、現在のデータを新しい日付でアップデートする必要があります。フライトのIDに基づいてこの新しいデータを既存のイベントにマージするために、 flightIdを入力列と出力列として示し、==ファンクション。次に、この条件の結果がtrueを返したときに実行するマージアクションを定義できます。この場合、入力列と出力列のflightIdの値が等しいときに条件は真になります。

このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットの保存方法を選択)ドロップダウンリストから[Merge] (マージ)を選択した場合のみ利用できます。

[When matched] (一致した場合)

テーブルの[マージ]で 定義されたファンクションがtrueを返したときに実行するマージアクションを定義するには、このチェックボックスをオンにします。[When matched] (一致した場合)オプションには、それぞれ1つのマージアクションを定義できます。2つのWhen matchedオプションを定義した場合、最初の1つは節条件を持っていなければならず、2つのオプションは定義された順番で評価されます。

条件フィールドに条件ステートメントを入力して、マージアクションを適用するデータをさらに絞り込むことができます。条件を指定すると、その行の条件が真の場合にのみ、指定された行に対してマージアクションが実行されます。条件を使用すると、マージを高速化できます。条件は、targetTable.column = this.columnの形式に従う必要があります。ここで、targeTableはターゲットテーブルの名前です。[Dataset source type] (データセットソースタイプ)の場合、これは接続の名前です。SQLソースタイプでは、これを[Table alias](テーブルのエイリアス)フィールドに置き換える必要があります。

[Merge action] (マージアクション) ドロップダウンリストで、実行するアクションを選択します。
  • [DELETE] (削除):ターゲットテーブルの行を削除します。
  • [UPDATE SET ] (アップデートセット)* :出力行のすべての値を入力行の値に置き換えます。このアクションでは、ソーステーブルにターゲットテーブルと同じ列が必要です。
  • [UPDATE] (アップデート):出力行の一部の値を、入力行の対応する値に置き換えます。この場合、アップデートを適用するカラムを指定する必要があります。

たとえば、航空会社のデータテーブルに、フライトの乗客数と、フライトがさらに混雑したかどうかを示すブール値が含まれているとします。次の条件は、マージアクションの範囲を、乗客の数に基づいて混雑しているフライトに制限します。flightEvents.nbOfPeople< this.nbOfPeople。マージアクションは、UPDATE SET *アクションを使用してisFlightMoreCrowdedの値をtrueにアップデートすることです。

このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットの保存方法を選択)ドロップダウンリストから[Merge] (マージ)を選択した場合のみ利用できます。

一致しない場合

[テーブルの マージ]で 定義されたファンクションがfalseを返したときに実行するマージアクションを定義するには、このチェックボックスをオンにします。

条件フィールドに条件ステートメントを入力して、マージアクションを適用するデータをさらに絞り込むことができます。条件を指定すると、その行の条件が真の場合にのみ、指定された行に対してマージアクションが実行されます。条件を使用すると、マージを高速化できます。条件は、targetTable.column = this.columnの形式に従う必要があります。ここで、targeTableはターゲットテーブルの名前です。[Dataset source type] (データセットソースタイプ)の場合、これは接続の名前です。SQLソースタイプでは、これを[Table alias](テーブルのエイリアス)フィールドに置き換える必要があります。

[Merge action] (マージアクション) ドロップダウンリストで、実行するアクションを選択します。
  • [DELETE] (削除):ターゲットテーブルの行を削除します。
  • [INSERT SET ] (セットを挿入)*:入力データのすべての列を対応する出力列に挿入します。このアクションでは、ソーステーブルにターゲットテーブルと同じ列が必要です。
  • [INSERT] (インサート):入力データの一部の列を対応する出力列に挿入します。この場合、挿入するカラムを指定する必要があります。

たとえば、航空会社のデータテーブルが日付で分割されていて、マージアクションが以下の条件を持っているとします: flightEvents.date = current_date()。マージアクションは、マージオンテーブルで定義されたファンクションがfalseを返す場合に適用され、現在の日付に発生したフライトに対応するデータにのみ適用されます。

このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットの保存方法を選択)ドロップダウンリストから[Merge] (マージ)を選択した場合のみ利用できます。

詳細設定

[Define column partitions] (カラムパーティションを定義) このチェックボックスをオンにして、入力データのスキーマからのカラムを使用して、表示されるテーブルに入力します。選択したカラムのレコードは、データのパーティションを行うためのキーとして使用されます。
[Sort columns alphabetically] (カラムをアルファベット順にソート) スキーマのカラムをアルファベット順にソートする場合は、このチェックボックスをオンにします。このチェックボックスをオフのままにすると、これらのカラムはスキーマエディターで定義された順序に従います。
[Merge Schema] (スキーマをマージ) データセットのスキーマは、時間の経過とともに変化することがよくあります。スキーマが異なる場合に受信データと既存データのスキーマをマージするには、このチェックボックスをオンにします。

このチェックボックスと[Overwrite Schema] (スキーマの上書き)チェックボックスをオフのままにすると、既存のデータのカラムのみが使用されます。

[Overwrite Schema] (スキーマの上書き)

データセットのスキーマは、時間の経過とともに変化することがよくあります。受信データのスキーマを使用して既存のデータのスキーマを上書きするには、このチェックボックスをオンにします。

このチェックボックスと[Merge Schema] (スキーマをマージ)チェックボックスをオフのままにすると、既存のデータのカラムのみが使用されます。

使用方法

使用ルール

このコンポーネントは、終了コンポーネントとして使用され、入力リンクを必要とします。

Delta Lakeは、ファイルのアップロード時間とこのファイルのメタデータタイムスタンプとの間にわずかな違いを体系的に作成します。データをフィルタリングする必要がある場合は、この違いに留意してください。

このDelta Lakeレイヤーは、Data Lakeシステムの上に構築されているため、Data Lakeシステムに対応する設定コンポーネント(tAzureFSConfigurationなど)を使用して、Data Lakeシステムの一部として接続されます。

[Spark Connection] (Spark接続)

[Run] (実行)ビューの[Spark configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
  • Yarnモード(YarnクライアントまたはYarnクラスター):
    • Google Dataprocを使用している場合、[Spark configuration] (Spark設定)タブの[Google Storage staging bucket] (Google Storageステージングバケット)フィールドにバケットを指定します。

    • HDInsightを使用している場合、[Spark configuration] (Spark設定)タブの[Windows Azure Storage configuration] (Windows Azure Storage設定)エリアでジョブのデプロイメントに使用するブロブを指定します。

    • Altusを使用する場合は、[Spark configuration] (Spark設定)タブでジョブのデプロイにS3バケットまたはAzure Data Lake Storageを指定します。
    • オンプレミスのディストリビューションを使用する場合は、クラスターで使われているファイルシステムに対応する設定コンポーネントを使用します。

  • [Standalone mode] (スタンドアロンモード): クラスターで使用されているファイルシステムに対応する設定コンポーネントを使用します。

    ジョブ内に設定コンポーネントがない状態でDatabricksを使用している場合、ビジネスデータはDBFS (Databricks Filesystem)に直接書き込まれます。

この接続は、ジョブごとに有効になります。