Apache Spark StreamingのtJavaプロパティ - 7.3

Java custom code

EnrichVersion
Cloud
7.3
EnrichProdName
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for Big Data
Talend Open Studio for Data Integration
Talend Open Studio for ESB
Talend Real-Time Big Data Platform
EnrichPlatform
Talend Studio
task
ジョブデザインと開発 > サードパーティーシステム > カスタムコードコンポーネント > Javaカスタムコードコンポーネント
データガバナンス > サードパーティーシステム > カスタムコードコンポーネント > Javaカスタムコードコンポーネント
データクオリティとプレパレーション > サードパーティーシステム > カスタムコードコンポーネント > Javaカスタムコードコンポーネント

これらのプロパティを使って、Spark Streamingジョブフレームワーク内で実行されているtJavaを設定します。

Spark StreamingtJavaコンポーネントはカスタムコードファミリーのコンポーネントです。

このコンポーネントは、Talend Real Time Big Data PlatformおよびTalend Data Fabricで使用できます。

基本設定

[Schema] (スキーマ)および[Edit Schema] (スキーマの編集)

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

スキーマを変更するには[Edit schema] (スキーマの編集)をクリックします。現在のスキーマがリポジトリータイプの場合は、3つのオプションを使用できます。

  • [View schema] (スキーマの表示): スキーマのみを表示する場合は、このオプションを選択します。

  • [Change to built-in property] (組み込みのプロパティに変更): ローカルで変更を行うためにスキーマを組み込みに変更する場合は、このオプションを選択します。

  • [Update repository connection] (リポジトリー接続を更新): リポジトリーに保存されているスキーマに変更を加え、変更後にそのコンテンツをすべてのジョブにプロパゲートするかどうかを決める場合は、このオプションを選択します。変更を現在のジョブにのみ反映する場合は、変更後、[No] (いいえ)を選択し、[Repository Content] (リポジトリーのコンテンツ)ウィンドウで再びこのスキーマのメタデータを選択します。

null不可能なプリミティブフィールドの入力値がnullの場合、そのフィールドを含むデータ行は拒否されることに注意してください。

 

[Built-in] (組み込み): そのコンポーネントのみのスキーマを作成して、ローカルに保存します。

 

[Repository] (リポジトリー): スキーマは作成済みで、リポジトリーに保管されています。さまざまなプロジェクトやジョブデザインで再利用できます。

[Code] (コード)

入力リンクからのRDDを処理するため、または新しいRDDをこの入力リンクから作成するためのJavaコードを入力します。

スキーマ、リンクとコンポーネント名を利用してカスタムコードを作成する必要があります。たとえば、このコンポーネントのラベルがtJava_1で、tJava_1への接続のラベルがrow1である場合、入力RDDのクラスはrow1Structで。入力RDD自体はrdd_tJava_1変数と共に利用できます。

詳細な手順は、このコンポーネントの[Code] (コード)フィールドに記載のデフォルトのコメントを参照してください。

SparkのJava APIの詳細は、ApacheのSparkのドキュメント(https://spark.apache.org/docs/latest/api/java/index.html)を参照してください。

詳細設定

Classes (クラス)

[Basic settings] (基本設定)ビューの[Code] (コード)フィールドに書き込まれたコード内で使用する必要のあるクラスを定義します。

シリアライズで最終的な例外の発生を避けるために、[Code] (コード)フィールド内ではなく、このフィールド内で新しいクラスを定義することをお勧めします。

[Import] (インポート)

インポートするJavaコード、および必要に応じて[Basic settings] (基本設定)ビューの[Code] (コード)フィールドで使用されている外部ライブラリーを入力します。

使用方法

使用ルール

このコンポーネントは終了コンポーネントとして使用され、入力リンクを必要とします。

コードサンプル [Basic settings] (基本設定)ビューの[Code] (コード)フィールドに次のコードを入力し、入力RDDにカスタム変換を使用して、出力RDDを作成します。mapInToOutは、[Advanced settings] (詳細設定)ビューの[Classes] (クラス)フィールドで定義するクラスです。
outputrdd_tJava_1 = rdd_tJava_1.map(new mapInToOut(job));
[Advanced settings] (詳細設定)ビューの[Classes] (クラス)フィールドに次のコードを入力して、mapInToOutクラスを定義します。
public static class mapInToOut implements
  org.apache.spark.api.java.function.Function<inputStruct,RecordOut_tJava_1>{

     private ContextProperties context = null;
     private java.util.List<org.apache.avro.Schema.Field> fieldsList;
		
     public mapInToOut(JobConf job) {
	   this.context = new ContextProperties(job);
     }
		
     @Override
     public RecordOut_tJava_1 call(inputStruct origStruct) {		
			
	 if (fieldsList == null) {
	     this.fieldsList = (new inputStruct()).getSchema()
			.getFields();
	 }

	 RecordOut_tJava_1 value = new RecordOut_tJava_1();

	 for (org.apache.avro.Schema.Field field : fieldsList) {
	      value.put(field.pos(), origStruct.get(field.pos()));
	 }

	 return value;		
			
      }
}

[Spark Connection] (Spark接続)

[Run] (実行)ビューの[Spark Configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリを指定する必要があります。
  • Yarnモード(YarnクライアントまたはYarnクラスター):
    • Google Dataprocを使用している場合、[Spark configuration] (Spark設定)タブの[Google Storage staging bucket] (Google Storageステージングバケット)フィールドにバケットを指定します。

    • HDInsightを使用している場合、[Spark configuration] (Spark設定)タブの[Windows Azure Storage configuration] (Windows Azure Storage設定)エリアでジョブのデプロイメントに使用するブロブを指定します。

    • Altusを使用する場合は、[Spark configuration] (Spark設定)タブでジョブのデプロイにS3バケットまたはAzure Data Lake Storageを指定します。
    • Quboleを使用する場合は、ジョブにtS3Configurationを追加し、QuboleでS3システム内に実際のビジネスデータを書き込みます。tS3Configurationを使用しないと、このビジネスデータはQubole HDFSシステムに書き込まれ、クラスターをシャットダウンすると破棄されます。
    • オンプレミスのディストリビューションを使用する場合は、クラスターで使用されているファイルシステムに対応する設定コンポーネントを使用します。一般的に、このシステムはHDFSになるため、tHDFSConfigurationを使用します。

  • [Standalone mode] (スタンドアロンモード): tHDFSConfigurationまたはtS3Configurationなど、クラスターで使用されているファイルシステムに対応する設定コンポーネントを使用します。

    ジョブ内に設定コンポーネントがない状態でDatabricksを使用している場合、ビジネスデータはDBFS (Databricks Filesystem)に直接書き込まれます。

この接続は、ジョブごとに有効になります。