Spark Streamingジョブを使用してMongoDBでデータの読み書きを行う - 7.3

MongoDB

Version
7.3
Language
日本語 (日本)
Product
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Open Studio for Big Data
Talend Real-Time Big Data Platform
Module
Talend Studio
Content
ジョブデザインと開発 > サードパーティーシステム > NoSQLコンポーネント > MongoDB
データガバナンス > サードパーティーシステム > NoSQLコンポーネント > MongoDB
データクオリティとプレパレーション > サードパーティーシステム > NoSQLコンポーネント > MongoDB

このシナリオは、Talend Real Time Big Data PlatformおよびTalend Data Fabricにのみ適用されます。

Talendでサポートされているテクノロジーの詳細は、Talendコンポーネントを参照してください。

このシナリオでは、Spark Streamingジョブを作成して、特定の映画ディレクターに関するデータをMongoDBから抽出し、このデータを使用してフィルタリングを行い、映画情報を完成し、次に結果をMongoDBコレクションに書き込みます。

映画監督に関するサンプルデータは、以下のように読み取ります:
1;Gregg Araki	
2;P.J. Hogan 
3;Alan Rudolph 
4;Alex Proyas
5;Alex Sichel

このデータには、監督の名前と監督のID番号が含まれます。

MongoDB内のこのデータの構造は、次のとおりです。
{ "_id" : ObjectId("575546da3b1c7e22bc7b2189"), "person" : { "id" : 3, "name" : "Alan Rudolph" } }
{ "_id" : ObjectId("575546da3b1c7e22bc7b218b"), "person" : { "id" : 4, "name" : "Alex Proyas" } }
{ "_id" : ObjectId("575546da3b1c7e22bc7b218c"), "person" : { "id" : 5, "name" : "Alex Sichel" } }
{ "_id" : ObjectId("575546da3b1c7e22bc7b2188"), "person" : { "id" : 1, "name" : "Gregg Arakit" } }
{ "_id" : ObjectId("575546da3b1c7e22bc7b218a"), "person" : { "id" : 2, "name" : "P.J. Hogan" } }

サンプルデータはあくまでも例示用です。

tHDFSConfigurationはこのシナリオで、ジョブに依存するjarファイルの転送先となるHDFSシステムに接続するために、Sparkによって使用されます。

[Run] (実行)ビューの[Spark Configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
  • Yarnモード(YarnクライアントまたはYarnクラスター):
    • Google Dataprocを使用している場合、[Spark configuration] (Spark設定)タブの[Google Storage staging bucket] (Google Storageステージングバケット)フィールドにバケットを指定します。

    • HDInsightを使用している場合、[Spark configuration] (Spark設定)タブの[Windows Azure Storage configuration] (Windows Azure Storage設定)エリアでジョブのデプロイメントに使用するブロブを指定します。

    • Altusを使用する場合は、[Spark configuration] (Spark設定)タブでジョブのデプロイにS3バケットまたはAzure Data Lake Storageを指定します。
    • Quboleを使用する場合は、ジョブにtS3Configurationを追加し、QuboleでS3システム内に実際のビジネスデータを書き込みます。tS3Configurationを使用しないと、このビジネスデータはQubole HDFSシステムに書き込まれ、クラスターをシャットダウンすると破棄されます。
    • オンプレミスのディストリビューションを使用する場合は、クラスターで使用されているファイルシステムに対応する設定コンポーネントを使用します。一般的に、このシステムはHDFSになるため、tHDFSConfigurationを使用します。

  • [Standalone mode] (スタンドアロンモード): tHDFSConfigurationtS3Configurationなど、クラスターで使われているファイルシステムに対応する設定コンポーネントを使用します。

    ジョブ内に設定コンポーネントがない状態でDatabricksを使用している場合、ビジネスデータはDBFS (Databricks Filesystem)に直接書き込まれます。

前提条件:
  • 使用するSparkクラスターとMongoDBデータベースが正しくインストールされ、実行中であること。

  • 上記のデータが、使用するMongoDBコレクションにロードされていること。

このシナリオを複製するには、次の手順に従います。