ストリーミング航空機データの処理 - Cloud

Talend Cloud Pipeline Designerプロセッサーガイド

Version
Cloud
Language
日本語
Product
Talend Cloud
Module
Talend Pipeline Designer
Content
ジョブデザインと開発 > パイプラインのデザイン
Last publication date
2024-02-26

Kafkaソース、Windowプロセッサー、Aggregateプロセッサー、Field selectorプロセッサー、Python 3プロセッサー、MySQLデスティネーションを伴うパイプライン。

始める前に

  • ソースデータを保管するシステムへの接続が作成済みであること。

  • ソースデータを保管するデータセットが追加済みであること。

    ここでは、航空機のID、位置、タイムスタンプなどの航空機データをストリーミングします。

    このシナリオを理解するために、このシナリオで使用されるストリーミングデータのAVROスキーマを次に示します:
    {
      "type": "record",
      "name": "aircrafts",
      "fields": [
      {"name": "Id", "type": "int"},
      {"name": "PosTime", "type": "long"},
      {"name": "Lat", "type": "double"},
      {"name": "Long", "type": "double"},
      {"name": "Op", "type": "string"}
      ]
    }

    Idが航空機の識別子に対応している場合、PosTimeは位置のタイムスタンプに対応し、Lat/Longは航空機の緯度/経度に対応し、Opは航空会社に対応しています。

  • 接続および処理済みデータを保管する関連データセットも作成済みであること。

    ここでは、MySQLテーブルを使用します。

手順

  1. [Pipelines] (パイプライン)ページで[Add pipeline] (パイプラインを追加)をクリックします。新しいパイプラインが開きます。
  2. パイプラインに意味のある名前を付けます。

    ストリーミング航空機データの処理
  3. [ADD SOURCE] (ソースを追加)をクリックしてパネルを開きます。このパネルで、ソースデータ(この場合はkafkaの航空機トピック)を選択できます。
    航空機レコードを持つデータサンプルのプレビュー。
  4. データセットを選択し、[Select] (選択)をクリックしてパイプラインに追加できるようにします。
    必要であれば名前を変更します。
  5. +をクリックし、パイプラインにWindowプロセッサーを追加します。設定パネルが開きます。
  6. プロセッサーに意味のある名前を付けます。

    5sec window
  7. [Configuration] (設定)エリアで以下の操作を行います。
    1. [Use Window session] (ウィンドウセッションを使用)トグルを有効にします。
    2. ウィンドウ継続時間として5000を入力し、5秒ごとにデータをキャプチャーできるようにします。
  8. +をクリックし、パイプラインにAggregateプロセッサーを追加します。設定パネルが開きます。
  9. プロセッサーに意味のある名前を付けます。

    group by aircraft
  10. [Group by] (グループ基準)エリアで、集計セットに使用するフィールドを選択します。ここでは、.customerIdになります。
    1. 航空機識別子でグルーピングするには、[Field] (フィールド)リストで.Idを選択します。
    2. [New element] (新規エレメント) 追加し、[Field] (フィールド)リストで.Opを選択して航空会社ごとにグルーピングします。
  11. [Operations] (操作)エリアで次の操作を行います。
    1. [Field] (フィールド)リストで.PosTime[Operation] (操作)リストでMaximumを選択します。
    2. 生成されたフィールド([Output field] (出力フィールド))に、たとえばdateという名前を付けます。
    3. +記号をクリックして新しいエレメントを追加し、[Field] (フィールド)リストで.Latを、[Operation] (操作)リストで[List] (リスト)をそれぞれ入力します。
    4. 生成されたフィールドに、たとえばlatという名前を付けます。
    5. +記号をクリックして新しいエレメントを追加し、[Field] (フィールド)リストで.Longを、[Operation] (操作)リストで[List] (リスト)をそれぞれ入力します。
    6. 生成されたフィールドに、たとえば lonという名前を付けます。
  12. [Save] (保存)をクリックして設定を保存します。
  13. +をクリックし、パイプラインにField Selectorプロセッサーを追加します。設定パネルが開きます。
  14. プロセッサーに意味のある名前を付けます。

    select latest position
  15. [Advanced] (詳細設定)モードの[Selectors] (セレクター)エリアで:
    1. idフィールドを同じ場所に置いたまま選択する場合は、[Output] (出力)リストにid、そして[Input] (入力)リストに.idとそれぞれ入力します。
    2. Opフィールドを選択して名前を変更する場合は、+記号をクリックして新しいエレメントを追加し、[Output] (出力)リストにairlinesと、[Input] (入力)リストに.Opとそれぞれ入力します。
    3. dateフィールドを選択して同じ場所に保持しておく場合は、+記号をクリックして新しいエレメントを追加し、[Output] (出力)リストにdateと、[Input] (入力)リストに.dateとそれぞれ入力します。
    4. 最初の場所のlatフィールドを選択してスキーマのトップレベルに移動させる場合は、+記号をクリックして新しいエレメントを追加し、[Output] (出力)リストにlatと、[Input] (入力)リストに.lat[-1]とそれぞれ入力します。
    5. 最初の場所のlonフィールドを選択してスキーマのトップレベルに移動させる場合は、+記号をクリックして新しいエレメントを追加し、[Output] (出力)リストにlonと、[Input] (入力)リストに.lon[-1]とそれぞれ入力します。

      このエリアではavpath構文を使用できます。

  16. [Save] (保存)をクリックして設定を保存します。
  17. +をクリックし、パイプラインにPython 3プロセッサーを追加します。設定パネルが開きます。
  18. プロセッサーに意味のある名前を付けます。

    ジオハッシュの計算
  19. [Python 3 code] (Python 3コード)エリアに、以下を入力します:
    def encode(latitude, longitude, precision=12):
      
        __base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
        __decodemap = { }
        for i in range(len(__base32)):
            __decodemap[__base32[i]] = i
        del i
    
        lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
        geohash = []
        bits = [ 16, 8, 4, 2, 1 ]
        bit = 0
        ch = 0
        even = True
        while len(geohash) < precision:
            if even:
                mid = (lon_interval[0] + lon_interval[1]) / 2
                if longitude > mid:
                    ch |= bits[bit]
                    lon_interval = (mid, lon_interval[1])
                else:
                    lon_interval = (lon_interval[0], mid)
            else:
                mid = (lat_interval[0] + lat_interval[1]) / 2
                if latitude > mid:
                    ch |= bits[bit]
                    lat_interval = (mid, lat_interval[1])
                else:
                    lat_interval = (lat_interval[0], mid)
            even = not even
            if bit < 4:
                bit += 1
            else:
                geohash += __base32[ch]
                bit = 0
                ch = 0
        return ''.join(geohash)
    
    output = json.loads("{}")
    output['id'] = input['id']
    output['airlines'] = input['airlines']
    output['date'] = input['date']
    output['location'] = encode(input['lat'], input['lon'])
    
    output.append(output)

    このコードを使用すると、ジオハッシュ(緯度と経度から得られる文字列表示の地理的位置)を計算できます。

  20. [Save] (保存)をクリックして設定を保存します。
  21. [ADD DESTINATION] (デスティネーションを追加)をクリックし、処理済みデータを保持するデータセットを選択します。
    必要であれば名前を変更します。
  22. (オプション) Python 3プロセッサーのプレビューを表示してデータをプレビューします。
    ジオハッシュ情報を計算した後のPython 3プロセッサーのプレビュー。
  23. Talend Cloud Pipeline Designerの上部ツールバーで[Run] (実行)ボタンをクリックするとパネルが開き、実行プロファイルを選択できるようになります。
  24. リストで実行プロファイルを選択し(詳細は実行プロファイルをご覧ください)、[Run] (実行)をクリックしてパイプラインを実行します。

タスクの結果

ストリーミングパイプラインは実行中となり、強制終了するまで実行されます。航空機のデータが変更され、計算したジオハッシュ情報が、指定したターゲットシステムに送信されます。