Apache Spark StreamingのtCassandraOutputプロパティ - 7.3

Cassandra

Version
7.3
Language
日本語 (日本)
Product
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Open Studio for Big Data
Talend Real-Time Big Data Platform
Module
Talend Studio
Content
ジョブデザインと開発 > サードパーティーシステム > NoSQLコンポーネント > Cassandra
データガバナンス > サードパーティーシステム > NoSQLコンポーネント > Cassandra
データクオリティとプレパレーション > サードパーティーシステム > NoSQLコンポーネント > Cassandra

このプロパティはSpark Streamingジョブフレームワークで実行されているtCassandraOutputを設定するために使います。

Spark Streaming tCassandraOutputコンポーネントはデータベースファミリーのコンポーネントです。

このコンポーネントは、Talend Real Time Big Data PlatformおよびTalend Data Fabricで使用できます。

基本設定

[Property type] (プロパティタイプ)

[Built-In] (組み込み)または[Repository] (リポジトリー)のいずれか。

[Built-In] (組み込み): プロパティデータは一元的に保存されません。

[Repository] (リポジトリー): プロパティを保存するリポジトリーファイルを選択します。

[Sync columns] (カラムの同期)

ジョブで接続された前のコンポーネントからスキーマを取得するには、このボタンをクリックします。

[Keyspace]

データの書き込み先にキースペースの名前を入力します。

[Action on keyspace] (キースペースのアクション)

使用するキースペースで実行する操作を選択します:

  • [None] (なし): 操作は行われません。

  • [Drop and create keyspace] (キースペースの削除と作成): キースペースを削除してから、再作成します。

  • [Create keyspace] (キースペースの作成): キースペースが存在しないため、作成します。

  • [Create keyspace if not exists] (キースペースが存在しない場合に作成): キースペースが存在しない場合は作成します。

  • [Drop keyspace if exists and create] (キースペースが存在する場合に削除して作成): キースペースが既に存在する場合は、削除されて再作成されます。

[Column family] (カラムファミリー)

データの書き込み先にキースペースの名前を入力します。

[Action on column family] (カラムファミリーのアクション)

使用するカラムファミリーで実行する操作を選択します:

  • [None] (なし): 操作は実行されません。

  • [Create column family if not exists] (カラムファミリーが存在しない場合に作成): カラムファミリーが存在しない場合に作成します。

  • [Drop column family if exists and create] (カラムファミリーが存在する場合に削除と作成): カラムファミリーが既に存在している場合は削除され、再作成されます。

  • [Truncate column family] (カラムファミリーの切り捨て): カラムファミリーのすべてのデータが完全に削除されます。

このリストは[Action on data] (データの操作)ドロップダウンリストから[Update] (更新)[Upsert] (アップサート)または[Insert] (挿入)を選択した場合にのみ使用できます。

[Action on data] (データへのアクション)

定義されたテーブルのデータでは、以下の操作を実行できます:

  • [Upsert] (アップサート): カラムまたは既存のカラムが存在しない場合は、カラムを挿入します。

    このアクションでは、スキーマで定義される列の名前に小文字を使用する必要がありますが、スキーマの[DB column] (DBカラム)カラムに配置した名前は、文字ケースも含めて、ターゲットテーブル内の対応するカラムと同じにする必要があります。

  • [Insert] (挿入): カラムが存在しない場合は、カラムを挿入します。このアクションは、既存のものも更新します。

  • [Update] (更新): 既存のカラムを更新するか、存在しないカラムを追加します。このアクションでは[Counter] (カウンター)Cassandraデータタイプがサポートされていません。

  • [Delete] (削除): 入力フローに対応するカラムを削除します。

詳細な操作は、[Advanced settings] (詳細設定)ビューを使用します。

[Schema] (スキーマ)[Edit schema] (スキーマを編集)

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

スキーマを変更するには[Edit schema] (スキーマを編集)をクリックします。現在のスキーマがリポジトリータイプの場合は、3つのオプションを使用できます。

  • [View schema] (スキーマの表示): スキーマのみを表示する場合は、このオプションを選択します。

  • [Change to built-in property] (組み込みのプロパティに変更): ローカルで変更を行うためにスキーマを組み込みに変更する場合は、このオプションを選択します。

  • [Update repository connection] (リポジトリー接続を更新): リポジトリーに保存されているスキーマに変更を加え、変更後にそのコンテンツをすべてのジョブにプロパゲートするかどうかを決める場合は、このオプションを選択します。変更を現在のジョブにのみ反映する場合は、変更後、[No] (いいえ)を選択し、[Repository Content] (リポジトリーのコンテンツ)ウィンドウで再びこのスキーマのメタデータを選択します。

このコンポーネントのスキーマは、オブジェクトタイプとリストタイプに対応していません。

 

[Built-in] (組み込み): そのコンポーネントのみのスキーマを作成して、ローカルに保存します。

 

[Repository] (リポジトリー): スキーマは作成済みで、リポジトリーに保管されています。さまざまなプロジェクトやジョブデザインで再利用できます。

再使用するスキーマに整数またはファンクションのデフォルト値が指定されている場合は、これらのデフォルト値を引用符で囲まないようにご注意ください。引用符で囲まれている場合は手動で削除します。

詳細は、Talend Studioユーザーガイドでテーブルスキーマに関連する説明を参照してください。

詳細設定

[Configuration] (設定)

Cassandraにアップサートするデータでカスタマイズする必要があるCassandraのプロパティを追加します。
  • たとえば、書き込みにおけるCassandraの一貫性レベルを定義する必要がある場合は、[Property name] (プロパティ名)カラムで[output_consistency_level]プロパティを選択し、[Value] (値)カラムに数値レベルの値を入力します。

以下のリストは、入力できる数値と、その値で示される一貫性レベルを示しています:

  • 0: ANY、

  • 1: ONE、

  • 2: TWO、

  • 3: THREE、

  • 4: QUORUM、

  • 5: ALL、

  • 6: LOCAL_QUORUM、

  • 7: EACH_QUORUM、

  • 8: SERIAL、

  • 9: LOCAL_SERIAL、

  • 10: LOCAL_ONE

各整合性ポリシーの詳細は、Cassandraに関するDatastaxのドキュメントを参照してください。

テーブルに行を追加する場合は、[Property name] (プロパティ名)カラムの新しい行をクリックして、 使用可能なプロパティの一覧を表示し、カスタマイズするプロパティを選択する必要があります。これらの各プロパティの詳細は、https://github.com/datastax/spark-cassandra-connector/Blob/master/doc/5_saving.mdのリンクのチューニングセクションを参照してください。

[Use unlogged batch] (UNLOGGEDバッチを使用)

このチェックボックスをオンにすると、データをバッチ処理できますが、CassandraのUNLOGGEDアプローチが使用されます。このフィーチャーは[Insert] (挿入)[Update] (更新)および[Delete] (削除)の3つの操作で使用できます。

次に、バッチモードの動作を設定する必要があります:
  • [Batch size] (バッチサイズ): 処理する各バッチの行数を入力します。

  • [Group batch method] (グループバッチ方式): 行をグループ化して一括処理する方法を選択します。
    1. [Partition] (パーティション): 同じパーティションキーを共有する行をグループ化します。

    2. [Replica] (レプリカ): 同じレプリカに書き込まれる行をグループ化します

    3. [None] (なし): ランダムに行をグループ化します。このオプションは、単一ノードのCassandraに適しています。

  • [Cache batch group] (キャッシュバッチグループ): このチェックボックスをオンにすると、グループ化する前に行がメモリにロードされます。この方法では、行の順序でグループ化が影響を受けることはありません。

    このチェックボックスをオフにしたままにすると、同じ条件を満たす連続した行のみがグループ化されます。

  • [Async execute] (非同期実行): このチェックボックスをオンにすると、tCassandraOutputでバッチが並行して送信されます。選択を解除したままにすると、tCassandraOutputでは、バッチの結果を待機してから、別のバッチをCassandraに送信します。

  • [Maximum number of batches executed in parallel] (並行して実行できるバッチの最大数): [Async execute] (非同期実行)を選択したら、Cassandraに並行して送信するバッチの数を入力します。

    この数値を負の数または0にすべきではなく、大きすぎる値も使用しないことをお勧めします。

Cassandraでバッチを使用する理想的な状況には、少数のテーブルで挿入または更新するデータを同期する場合などが挙げられます。

このUNLOGGEDアプローチでは、ジョブは、Cassandraのバッチログシステムにバッチを書き込まないため、この書き込みによって発生するパフォーマンスの問題を回避します。CassandraのBATCHステートメントおよびUNLOGGEDアプローチの詳細は、バッチを参照してください。

[Insert if not exists] (存在しない場合は挿入)

このチェックボックスをオンにすると、行が挿入されます。この行の挿入は、ターゲットテーブルに存在しない場合にのみ行われます。

このフィーチャーは[Insert] (挿入)操作でのみ使用できます。

[Delete if exists] (存在する場合は削除)

このチェックボックスをオンにすると、受信フロー内の同じレコードを持つ行のみがターゲットテーブルから削除されます。

このフィーチャーは[Delete] (削除)操作でのみ使用できます。

[Use TTL] (TTLを使用)

このチェックボックスをオンにすると、ターゲットテーブルにTTLデータが書き込まれます。表示されるカラムリストで、TTLカラムとして使用されるカラムを選択する必要があります。このカラムのDB型はIntにする必要があります。

このフィーチャーは[Insert] (挿入)操作および[Update] (更新)操作でのみ使用できます。

[Use Timestamp] (タイムスタンプを使用)

このチェックボックスをオンにすると、ターゲットテーブルにタイムスタンプデータが書き込まれます。表示されるカラムリストで、タイムスタンプデータの保管に使用するカラムを選択する必要があります。このカラムのDBタイプは[BigInt]にする必要があります。

このフィーチャーは、[Insert] (挿入)[Update] (更新)および[Delete] (削除)のアクションで利用可能です。

[IF condition] (IF条件)

[Update] (更新)または[Delate] (削除)操作を実行するために満たす条件を追加します。この条件を使用すると、更新または削除するカラムについて正確な操作が可能になります。

[Special assignment operation] (特別な割り当て操作)

このテーブルを完成して、[Update] (更新)操作をより具体的にするためのCassandraの高度なSETコマンドを構成します。たとえば、指定されたカラムの先頭または特定の位置にレコードを追加します。

このテーブルの[Update column] (カラムの更新)カラムで、更新されたカラムを選択し、[Operation] (操作)カラムから使用する操作を選択する必要があります。以下の操作を実行できます:
  • [Append] (後ろに追加): 更新するカラムの末尾に受信レコードを追加します。処理できるCassandraのデータタイプは、Counter、List、SetおよびMapになります。

  • [Prepend] (前に追加): 更新するカラムの先頭に受信レコードを追加します。処理できる唯一のCassandraのデータタイプはListになります。

  • [Remove] (削除): 受信フローに同じレコードが存在する場合、ターゲットテーブルからレコードを削除します。処理できるCassandraのデータタイプは、Counter、List、SetおよびMapになります。

  • [Assign based on position/key] (位置/キーに基づく割り当て): 更新するカラムの特定の位置にレコードを追加します。処理できるCassandraのデータタイプは、ListおよびMapになります。

    この操作を選択すると、[Map key/list position] (キー/リスト位置のマッピング)カラムが編集可能になります。このカラムから参照に使用するカラムを選択し、更新する位置を検索する必要があります。

これらの操作の詳細は、http://docs.datastax.com/en/cql/3.1/cql/cql_reference/update_r.html?scroll=reference_ds_g4h_qzq_xj__description_unique_34の関連ドキュメントを参照してください。

[Row key in the List type] (リストタイプの行キー)

CassandraのWHERE句を構成するために使用するカラムを選択し、[Update] (更新)または[Delete] (削除)操作を実行します。このテーブルで使用されるカラムは、Cassandraテーブルのプライマリキーカラムのセットから選ぶ必要があります。

[Delete collection column based on postion/key] (位置/キーに基づいてコレクションカラムを削除)

削除する特定の行を参照する際に使用するカラムを選択します。

このフィーチャーは[Delete] (削除)操作でのみ使用できます。

使用方法

使用ルール

このコンポーネントは終了コンポーネントとして使用され、入力リンクを必要とします。

このコンポーネントでは、同じジョブに存在するtCassandraConfigurationコンポーネントを1つのみ使用して、Cassandraに接続する必要があります。同じジョブに複数のtCassandraConfigurationコンポーネントが存在すると、ジョブの実行が失敗します。

このコンポーネントは、所属するSpark Batchコンポーネントのパレットと共に、Spark Batchジョブを作成している場合にだけ表示されます。

特に明記していない限り、このドキュメントのシナリオでは、[Standard] (標準)ジョブ、つまり従来の Talend データ統合ジョブだけを扱います。

[Spark Connection] (Spark接続)

[Run] (実行)ビューの[Spark Configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
  • Yarnモード(YarnクライアントまたはYarnクラスター):
    • Google Dataprocを使用している場合、[Spark configuration] (Spark設定)タブの[Google Storage staging bucket] (Google Storageステージングバケット)フィールドにバケットを指定します。

    • HDInsightを使用している場合、[Spark configuration] (Spark設定)タブの[Windows Azure Storage configuration] (Windows Azure Storage設定)エリアでジョブのデプロイメントに使用するブロブを指定します。

    • Altusを使用する場合は、[Spark configuration] (Spark設定)タブでジョブのデプロイにS3バケットまたはAzure Data Lake Storageを指定します。
    • Quboleを使用する場合は、ジョブにtS3Configurationを追加し、QuboleでS3システム内に実際のビジネスデータを書き込みます。tS3Configurationを使用しないと、このビジネスデータはQubole HDFSシステムに書き込まれ、クラスターをシャットダウンすると破棄されます。
    • オンプレミスのディストリビューションを使用する場合は、クラスターで使用されているファイルシステムに対応する設定コンポーネントを使用します。一般的に、このシステムはHDFSになるため、tHDFSConfigurationを使用します。

  • [Standalone mode] (スタンドアロンモード): tHDFSConfigurationまたはtS3Configurationなど、クラスターで使用されているファイルシステムに対応する設定コンポーネントを使用します。

    ジョブ内に設定コンポーネントがない状態でDatabricksを使用している場合、ビジネスデータはDBFS (Databricks Filesystem)に直接書き込まれます。

この接続は、ジョブごとに有効になります。