メイン コンテンツをスキップする 補完的コンテンツへスキップ

コンポーネントを設定

手順

  1. cTimerコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。下のように、オプションはデフォルト設定のままにしておきます。
  2. SetBodyAsJsonStringというラベルが付けられている最初のcSetBodyコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。
  3. [Language] (言語)ドロップダウンリストで[Constant] (定数)を選択し、メッセージボディとして[Expression] (式)フィールドに"{\"my_field1\": 1,\"my_field2\": 72438939,\"my_field3\": \"my demo Test message\"}"というJSON文字列を入力します。
  4. ConvertBodyToAvroというラベルが付けられている最初のcProcessorコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。
  5. [Code] (コード)フィールドに、メッセージボディ(JSON文字列)をAvroレコードに変換する以下のJavaコードを設定します。
    Object body = exchange.getMessage().getBody();
    exchange.getMessage().setBody(KafkaAvroConverterBean.jsonStringToAvro(body));
  6. [Advanced settings] (詳細設定)タブをクリックし、[Import] (インポート)フィールドに次のステートメントを入力します。local_projectTalendのプロジェクト名に置き換えます。
    import org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroConverterBean;
  7. ConvertBodyToJsonStringというラベルが付けられている2番目のcProcessorコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。
  8. [Code] (コード)フィールドに、メッセージボディ(Avroレコード)をJSON文字列に変換する以下のJavaコードを設定します。
    Object body = exchange.getMessage().getBody();
    exchange.getMessage().setBody(KafkaAvroConverterBean.avroToJsonString(body));
  9. [Advanced settings] (詳細設定)タブをクリックし、[Import] (インポート)フィールドに次のステートメントを入力します。local_projectTalendのプロジェクト名に置き換えます。
    import org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroConverterBean;
  10. SendMessageToKafkaというラベルが付けられている最初のcKafkaコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。
  11. [Broker List] (ブローカーリスト)フィールドに"localhost:9092"と入力します。
    [Topic] (トピック)フィールドに"demo.AVRO"と入力します。
    [Serializer Class] (シリアライザークラス)フィールドに、"org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroSerializerBean"と入力します。local_projectTalendのプロジェクト名に置き換えます。
    その他のオプションはデフォルト設定のままにしておきます。
  12. [Advanced settings] (詳細設定)タブをクリックします。[Kafka Properties] (Kafkaのプロパティ)フィールドで、"schemaRegistryURL"という名前と"localhost:8081"という値でパラメーターを追加します。
  13. ReceiveMessageFromKafkaというラベルが付けられている2番目のcKafkaコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。
  14. [Broker List] (ブローカーリスト)フィールドに"localhost:9092"と入力します。
    [Topic] (トピック)フィールドに"demo.AVRO"と入力します。
    その他のオプションはデフォルト設定のままにしておきます。
  15. [Advanced settings] (詳細設定)タブをクリックします。[Kafka Properties] (Kafkaのプロパティ)フィールドに、"schemaRegistryURL"という名前と"localhost:8081"という値を持つパラメーターを1つ、そして"valueDeserializer"という名前と"org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroDeserializerBean"という値を持つパラメーターを1つ追加します。local_projectTalendのプロジェクト名に置き換えます。
  16. メッセージ交換をログするcLogコンポーネントはデフォルト設定のままにしておきます。
  17. Ctrl + Sを押してジョブを保存します。

このページは役に立ちましたか?

このページまたはコンテンツに、タイポ、ステップの省略、技術的エラーなどの問題が見つかった場合は、お知らせください。改善に役立たせていただきます。