Apache Spark StreamingのtCassandraOutputプロパティ - Cloud - 8.0

Cassandra

Version
Cloud
8.0
Language
日本語
Product
https://talend.poolparty.biz/coretaxonomy/17
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Real-Time Big Data Platform
Module
Talend Studio
Content
ジョブデザインと開発 > サードパーティーシステム > NoSQLコンポーネント > Cassandra
データガバナンス > サードパーティーシステム > NoSQLコンポーネント > Cassandra
データクオリティとプレパレーション > サードパーティーシステム > NoSQLコンポーネント > Cassandra
Last publication date
2024-04-15

このプロパティはSpark Streamingジョブフレームワークで実行されているtCassandraOutputを設定するために使います。

Spark StreamingtCassandraOutputコンポーネントは、データベースファミリーに属しています。

このコンポーネントは、Talend Real Time Big Data PlatformおよびTalend Data Fabricで利用できます。

基本設定

[Property type] (プロパティタイプ)

[Built-in] (組み込み)または[Repository] (リポジトリー)のいずれかです。

[Built-In] (組み込み): 一元的に保存されるプロパティデータはありません。

[Repository] (リポジトリー): プロパティが保存されているリポジトリーファイルを選択します。

[Sync columns] (カラムを同期)

ジョブで接続された前のコンポーネントからスキーマを取得するには、このボタンをクリックします。

[Keyspace]

データの書き込み先にキースペースの名前を入力します。

[Action on keyspace] (キースペースのアクション)

使用するキースペースで実行する操作を選択します:

  • [None] (なし): 操作は行われません。

  • [Drop and create keyspace] (キースペースの削除と作成): キースペースを削除してから、再作成します。

  • [Create keyspace] (キースペースの作成): キースペースが存在しないため、作成します。

  • [Create keyspace if not exists] (キースペースが存在しない場合に作成): キースペースが存在しない場合は作成します。

  • [Drop keyspace if exists and create] (キースペースが存在する場合に削除して作成): キースペースが既に存在する場合は、削除されて再作成されます。

[Column family] (カラムファミリー)

データの書き込み先にキースペースの名前を入力します。

[Action on column family] (カラムファミリーのアクション)

使用するカラムファミリーで実行する操作を選択します:

  • [None] (なし): 操作は行われません。

  • [Create column family if not exists] (カラムファミリーが存在しない場合に作成): カラムファミリーが存在しない場合に作成します。

  • [Drop column family if exists and create] (カラムファミリーが存在する場合に削除と作成): カラムファミリーが既に存在している場合は削除され、再作成されます。

  • [Truncate column family] (カラムファミリーの切り捨て): カラムファミリーのすべてのデータが完全に削除されます。

このリストは、[Action on data] (データでのアクション)ドロップダウンリストから[Update] (アップデート)[Upsert] (アップサート)[Insert] (挿入)のいずれかを選択した場合のみ利用できます。

[Action on data] (データでのアクション)

定義されたテーブルのデータで実行できる操作は次のとおりです。

  • [Upsert] (アップサート): カラムまたは既存のカラムが存在しない場合は、カラムを挿入します。

    このアクションでは、スキーマで定義される列の名前に小文字を使用する必要がありますが、スキーマの[DB column] (DBカラム)カラムに配置した名前は、文字ケースも含めて、ターゲットテーブル内の対応するカラムと同じにする必要があります。

  • [Insert] (挿入): カラムが存在しない場合は、カラムを挿入します。このアクションは、既存のものもアップデートします。

  • [Update] (アップデート): 既存のカラムをアップデートするか、存在しないカラムを追加します。このアクションでは[Counter] (カウンター)Cassandraデータ型がサポートされていません。

  • [Delete] (削除): 入力フローに対応するカラムを削除します。

詳細な操作は、[Advanced settings] (詳細設定)ビューを使用します。

[Schema] (スキーマ)[Edit schema] (スキーマを編集)

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

スキーマを変更するには[Edit schema] (スキーマを編集)をクリックします。現在のスキーマがリポジトリータイプの場合は、3つのオプションを利用できます。

  • [View schema] (スキーマの表示): スキーマのみを表示する場合は、このオプションを選択します。

  • [Change to built-in property] (組み込みのプロパティに変更): ローカルで変更を行うためにスキーマを組み込みに変更する場合は、このオプションを選択します。

  • [Update repository connection] (リポジトリー接続をアップデート): リポジトリーに保存されているスキーマに変更を加え、変更後にそのコンテンツをすべてのジョブにプロパゲートするかどうかを決める場合は、このオプションを選択します。

    変更を現在のジョブにのみ反映する場合は、変更後、[No] (いいえ)を選択し、[Repository Content] (リポジトリーのコンテンツ)ウィンドウで再びこのスキーマのメタデータを選択します。

このコンポーネントのスキーマは、オブジェクトタイプとリストタイプに対応していません。

 

[Built-in] (組み込み): このコンポーネントに対してのみスキーマを作成し、ローカルに保管します。

 

[Repository] (リポジトリー): スキーマは作成済みで、リポジトリーに保管されています。さまざまなプロジェクトやジョブデザインで再利用できます。

再使用するスキーマに整数またはファンクションのデフォルト値が指定されている場合は、これらのデフォルト値を引用符で囲まないようにご注意ください。引用符で囲まれている場合は手動で削除します。

詳細は、Talend Studioユーザーガイドでテーブルスキーマに関連する説明をご覧ください。

詳細設定

[Configuration] (設定)

Cassandraにアップサートするデータでカスタマイズする必要があるCassandraのプロパティを追加します。
  • たとえば、書き込みにおけるCassandraの一貫性レベルを定義する必要がある場合は、[Property name] (プロパティ名)カラムで[output_consistency_level]プロパティを選択し、[Value] (値)カラムに数値レベルの値を入力します。

以下のリストは、入力できる数値と、その値で示される一貫性レベルを示しています:

  • 0: ANY、

  • 1: ONE、

  • 2: TWO、

  • 3: THREE、

  • 4: QUORUM、

  • 5: ALL、

  • 6: LOCAL_QUORUM、

  • 7: EACH_QUORUM、

  • 8: SERIAL、

  • 9: LOCAL_SERIAL、

  • 10: LOCAL_ONE

各整合性ポリシーの詳細は、Cassandraに関するDatastaxのドキュメンテーションをご覧ください。

テーブルに行を追加する場合は、[Property name] (プロパティ名)カラムの新しい行をクリックして、 使用可能なプロパティの一覧を表示し、カスタマイズするプロパティを選択する必要があります。これらの各プロパティの詳細は、https://github.com/datastax/spark-cassandra-connector/Blob/master/doc/5_saving.mdのリンクのチューニングセクションをご覧ください。

[Use unlogged batch] (UNLOGGEDバッチを使用)

このチェックボックスをオンにすると、データをバッチ処理できますが、CassandraのUNLOGGEDアプローチが使用されます。このフィーチャーは[Insert] (挿入)[Update] (アップデート)および[Delete] (削除)の3つの操作で利用できます。

次に、バッチモードの動作を設定する必要があります:
  • [Batch size] (バッチサイズ): 処理する各バッチの行数を入力します。

  • [Group batch method] (グループバッチ方式): 行をグルーピングして一括処理する方法を選択します。
    1. [Partition] (パーティション): 同じパーティションキーを共有する行をグルーピングします。

    2. [Replica] (レプリカ): 同じレプリカに書き込まれる行をグルーピングします

    3. [None] (なし): ランダムに行をグルーピングします。このオプションは、単一ノードのCassandraに適しています。

  • [Cache batch group] (キャッシュバッチグループ): このチェックボックスをオンにすると、グルーピングする前に行がメモリにロードされます。この方法では、行の順序でグルーピングが影響を受けることはありません。

    このチェックボックスをオフにしたままにすると、同じ条件を満たす連続した行のみがグルーピングされます。

  • [Async execute] (非同期実行): このチェックボックスをオンにすると、tCassandraOutputでバッチが並行して送信されます。選択を解除したままにすると、tCassandraOutputでは、バッチの結果を待機してから、別のバッチをCassandraに送信します。

  • [Maximum number of batches executed in parallel] (並行して実行できるバッチの最大数): [Async execute] (非同期実行)を選択したら、Cassandraに並行して送信するバッチの数を入力します。

    この数値を負の数または0にすべきではなく、大きすぎる値も使用しないことをお勧めします。

Cassandraでバッチを使用する理想的な状況には、少数のテーブルで挿入またはアップデートするデータを同期する場合などが挙げられます。

このUNLOGGEDアプローチでは、ジョブは、Cassandraのバッチログシステムにバッチを書き込まないため、この書き込みによって発生するパフォーマンスの問題を回避します。CassandraのBATCHステートメントおよびUNLOGGEDアプローチの詳細は、バッチをご覧ください。

[Insert if not exists] (存在しない場合は挿入)

このチェックボックスをオンにすると、行が挿入されます。この行の挿入は、ターゲットテーブルに存在しない場合のみ行われます。

このフィーチャーは[Insert] (挿入)操作でのみ利用できます。

[Delete if exists] (存在する場合は削除)

このチェックボックスをオンにすると、受信フロー内の同じレコードを持つ行のみがターゲットテーブルから削除されます。

このフィーチャーは[Delete] (削除)操作でのみ利用できます。

[Use TTL] (TTLを使用)

このチェックボックスをオンにすると、ターゲットテーブルにTTLデータが書き込まれます。表示されるカラムリストで、TTLカラムとして使用されるカラムを選択する必要があります。このカラムのDB型はIntにする必要があります。

このフィーチャーは[Insert] (挿入)操作および[Update] (アップデート)操作でのみ利用できます。

[Use Timestamp] (タイムスタンプを使用)

このチェックボックスをオンにすると、ターゲットテーブルにタイムスタンプデータが書き込まれます。表示されるカラムリストで、タイムスタンプデータの保管に使用するカラムを選択する必要があります。このカラムのDBタイプは[BigInt]にする必要があります。

このフィーチャーは、[Insert] (挿入)[Update] (アップデート)および[Delete] (削除)のアクションで利用可能です。

[IF condition] (IF条件)

[Update] (アップデート)または[Delate] (削除)操作を実行するために満たす条件を追加します。この条件を使用すると、アップデートまたは削除するカラムについて正確な操作が可能になります。

[Special assignment operation] (特別な割り当て操作)

このテーブルを完成して、[Update] (アップデート)操作をより具体的にするためのCassandraの高度なSETコマンドを構成します。たとえば、指定されたカラムの先頭または特定の位置にレコードを追加します。

このテーブルの[Update column] (カラムのアップデート)カラムで、アップデートされたカラムを選択し、[Operation] (操作)カラムから使用する操作を選択する必要があります。以下の操作を実行できます:
  • [Append] (後ろに追加): アップデートするカラムの末尾に受信レコードを追加します。処理できるCassandraのデータ型は、Counter、List、Set、Mapになります。

  • [Prepend] (前に追加): アップデートするカラムの先頭に受信レコードを追加します。処理できる唯一のCassandraのデータ型はListになります。

  • [Remove] (削除): 受信フローに同じレコードが存在する場合、ターゲットテーブルからレコードを削除します。処理できるCassandraのデータ型は、Counter、List、Set、Mapになります。

  • [Assign based on position/key] (位置/キーに基づく割り当て): アップデートするカラムの特定の位置にレコードを追加します。処理できるCassandraのデータ型は、ListおよびMapになります。

    この操作を選択すると、[Map key/list position] (キー/リスト位置のマッピング)カラムが編集可能になります。このカラムから参照に使用するカラムを選択し、アップデートする位置を検索する必要があります。

これらの操作の詳細は、Datastaxの関連ドキュメンテーション(http://docs.datastax.com/en/cql/3.1/cql/cql_reference/update_r.html?scroll=reference_ds_g4h_qzq_xj__description_unique_34)をご覧ください。

[Row key in the List type] (リストタイプの行キー)

CassandraのWHERE句を構成するために使用するカラムを選択し、[Update] (アップデート)または[Delete] (削除)操作を実行します。このテーブルで使用されるカラムは、Cassandraテーブルのプライマリキーカラムのセットから選ぶ必要があります。

[Delete collection column based on postion/key] (位置/キーに基づいてコレクションカラムを削除)

削除する特定の行を参照する際に使用するカラムを選択します。

このフィーチャーは[Delete] (削除)操作でのみ利用できます。

使用方法

[Usage rule] (使用ルール)

このコンポーネントは終了コンポーネントとして使用され、入力リンクを必要とします。

このコンポーネントでは、同じジョブに存在するtCassandraConfigurationコンポーネントを1つのみ使用して、Cassandraに接続する必要があります。同じジョブに複数のtCassandraConfigurationコンポーネントが存在すると、ジョブの実行が失敗します。

このコンポーネントは、所属するSpark Batchのコンポーネントのパレットと共に、Spark Batchジョブを作成している場合にだけ表示されます。

特に明記していない限り、このドキュメンテーションのシナリオでは、標準ジョブ、つまり従来の Talend データ統合ジョブだけを扱います。

[Spark Connection] (Spark接続)

[Run] (実行)ビューの[Spark configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
  • Yarnモード(YarnクライアントまたはYarnクラスター):
    • Google Dataprocを使用している場合、[Spark configuration] (Spark設定)タブの[Google Storage staging bucket] (Google Storageステージングバケット)フィールドにバケットを指定します。

    • HDInsightを使用している場合、[Spark configuration] (Spark設定)タブの[Windows Azure Storage configuration] (Windows Azure Storage設定)エリアでジョブのデプロイメントに使用するブロブを指定します。

    • Altusを使用する場合は、[Spark configuration] (Spark設定)タブでジョブのデプロイにS3バケットまたはAzure Data Lake Storageを指定します。
    • オンプレミスのディストリビューションを使用する場合は、クラスターで使用されているファイルシステムに対応する設定コンポーネントを使用します。一般的に、このシステムはHDFSになるため、tHDFSConfigurationを使用します。

  • [Standalone mode] (スタンドアロンモード): tHDFSConfigurationtS3Configurationなど、クラスターで使われているファイルシステムに対応する設定コンポーネントを使用します。

    ジョブ内に設定コンポーネントがない状態でDatabricksを使用している場合、ビジネスデータはDBFS (Databricks Filesystem)に直接書き込まれます。

この接続は、ジョブごとに有効になります。