メイン コンテンツをスキップする 補完的コンテンツへスキップ

ConsumerRecordからAvroデータを読み取り

このタスクについて

読み取りジョブを設定するために使われます。

手順

  1. 読み取りジョブでtKafkaInputコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Output type] (出力タイプ)ドロップダウンリストで、ConsumerRecordを選択します。
      ConsumerRecordを使用する場合、AvroレコードはStudio Talendで次のようにオブジェクトとして分類されます。
    2. [Version] (バージョン)ドロップダウンリストで、使用するKafkaクラスターのバージョンを選択します。
    3. [Broker list] (ブローカーリスト)フィールドに、使用するKafkaクラスターのブローカーノードのアドレスを入力します。
    4. [Topic name] (トピック名)フィールドに、tKafkaInputがメッセージフィードを受信する元となるトピックの名前を入力します。
    5. [Consumer group id] (コンシューマーグループID)フィールドに、tKafkaInputの所属先としたいコンシューマーグループの名前を入力します。
  2. tJavaRowコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Edit schema] (スキーマを編集)の横にある[...]ボタンをクリックし、[Schema] (スキーマ)ダイアログボックスを開きます。
    2. [+]ボタンをクリックしてカラムを追加し、そのカラムに名前を付けます。例:
    3. [OK]をクリックしてこれらの変更を確認し、ポップアップ表示されるダイアログボックスで求められるプロパゲーションを承認します。
    4. [Code] (コード)フィールドにJavaコードを入力してコンテンツを抽出します。例:
      org.apache.kafka.clients.consumer.ConsumerRecord record = (org.apache.kafka.clients.consumer.ConsumerRecord) input_row.record;
      
      output_row.topic = record.topic();
      output_row.partition = record.partition();
      output_row.offset = record.offset();
      output_row.timestamp = record.timestamp();
      output_row.timestampType = record.timestampType();
      
      output_row.header1 = record.headers().lastHeader("header1").value();
      output_row.header2 = record.headers().lastHeader("header2").value();
      
      output_row.key = (byte[]) record.key();
      output_row.value = (byte[]) record.value();
  3. tLogRowコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Mode] (モード)エリアで、[Table (print values in cells of a table)] (テーブル(テーブルのセルの出力値))を選択して結果を読みやすくします。

タスクの結果

読み取りジョブが設定されます。

このページは役に立ちましたか?

このページまたはコンテンツに、タイポ、ステップの省略、技術的エラーなどの問題が見つかった場合は、お知らせください。改善に役立たせていただきます。