AvroデータをProducerRecordに書き込み - Cloud - 8.0

Kafka

Version
Cloud
8.0
Language
日本語
Product
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Open Studio for Big Data
Talend Real-Time Big Data Platform
Module
Talend Studio
Content
ジョブデザインと開発 > サードパーティーシステム > メッセージングコンポーネント > Kafka
データガバナンス > サードパーティーシステム > メッセージングコンポーネント > Kafka
データクオリティとプレパレーション > サードパーティーシステム > メッセージングコンポーネント > Kafka

このタスクについて

書き込みジョブを設定します。

手順

  1. 書き込みジョブでtFixedFlowInputコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Edit schema] (スキーマを編集)の横にある[...]ボタンをクリックし、[Schema] (スキーマ)ダイアログボックスを開きます。
    2. [+]ボタンをクリックしてカラムを追加し、そのカラムに名前を付けます。例:
    3. [OK]をクリックしてこれらの変更を確認し、ポップアップ表示されるダイアログボックスで求められるプロパゲーションを承認します。
    4. [Mode] (モード)エリアで[Use Single Table] (単一テーブルを使用)を選択し、各カラムの値を指定します。
  2. tJavaRowコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Code] (コード)フィールドにJavaコードを入力してコンテンツを抽出します。例:
      org.apache.kafka.clients.producer.ProducerRecord record = new org.apache.kafka.clients.producer.ProducerRecord(
      		input_row.topic,
      		input_row.partition,
      		input_row.timestamp,
      		input_row.key,
      		input_row.value
      );
      
      record.headers().add("header1", input_row.header1);
      record.headers().add("header2", input_row.header2);
      
      output_row.record = record;
  3. tKafkaOutputコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Input type] (入力タイプ)ドロップダウンリストでProducerRecordを選択します。
    2. [Version] (バージョン)ドロップダウンリストで、使用するKafkaクラスターのバージョンを選択します。
    3. [Broker list] (ブローカーリスト)フィールドに、使用するKafkaクラスターのブローカーノードのアドレスを入力します。

タスクの結果

書き込みジョブが設定されます。