ストリーミング航空機データの処理 - Cloud

Talend Cloud Pipeline Designerプロセッサーガイド

author
Talend Documentation Team
EnrichVersion
Cloud
EnrichProdName
Talend Cloud
task
ジョブデザインと開発 > パイプラインのデザイン
EnrichPlatform
Talend Pipeline Designer

始める前に

  • ソースデータを保管するシステムへの接続が作成済みであること。

  • ソースデータを保管するデータセットが追加済みであること。

    ここでは、航空機のID、位置、タイムスタンプなどの航空機データをストリーミングします。

    このシナリオを理解するために、このシナリオで使用されるストリーミングデータのAVROスキーマを次に示します:
    {
      "type": "record",
      "name": "aircrafts",
      "fields": [
      {"name": "Id", "type": "int"},
      {"name": "PosTime", "type": "long"},
      {"name": "Lat", "type": "double"},
      {"name": "Long", "type": "double"},
      {"name": "Op", "type": "string"}
      ]
    }

    Idが航空機の識別子に対応している場合、PosTimeは位置のタイムスタンプに対応し、Lat/Longは航空機の緯度/経度に対応し、Opは航空会社に対応しています。

  • 接続および処理済みデータを保管する関連データセットも作成済みであること。

    ここでは、MySQL テーブルになります。

手順

  1. [Pipelines] (パイプライン)ページで[ADD PIPELINE] (パイプラインの追加)をクリックします。新しいパイプラインが開きます。
  2. パイプラインに意味のある名前を付けます。

    例え

    ストリーミング航空機データの処理
  3. [ADD SOURCE] (ソースの追加)をクリックしてパネルを開くと、ここではkafkaの航空機トピックでソースデータを選択できます。
  4. データセットを選択し、[SELECT] (選択)をクリックしてパイプラインに追加します。
    必要であれば名前を変更します。
  5. をクリックしてパイプラインにWindowプロセッサーを追加します。設定パネルが開きます。
  6. プロセッサーに意味のある名前を付けます。

    例え

    5sec window
  7. [CONFIGURATION] (運用設定)エリアで以下の操作を行います。
    1. [Use Window session] (ウィンドウセッションを使用)トグルを有効にします。
    2. 5秒ごとにデータをキャプチャするために、ウィンドウ継続時間として5000を入力します。
  8. をクリックして、パイプラインに[Aggregate] (集計)プロセッサーを追加します。設定パネルが開きます。
  9. プロセッサーに意味のある名前を付けます。

    例え

    group by aircraft
  10. [GROUP BY] (分類基準)エリアで、集計セットに使用するフィールドを選択します: ここでは.customerIdになります。
    1. 航空機識別子でグループ化するには、[Field] (フィールド)リストで.Idを選択します。
    2. [NEW ELEMENT] (新規エレメント) 追加し、[Field] (フィールド)リストで.Opを選択して航空会社ごとにグループ化します。
  11. [OPERATIONS] (オペレーション)エリアで以下のようにします:
    1. [Field] (フィールド)リストで.PosTime[Operation] (オペレーション)リストでMaximumを選択します。
    2. 生成されたフィールド([Output field] (出力フィールド))に、たとえばdateという名前を付けます。
    3. [NEW ELEMENT] (新規エレメント)追加し、[Field] (フィールド)リストで.Lat[Operation] (オペレーション)リストでListを選択します。
    4. 生成されたフィールドに、たとえばlatという名前を付けます。
    5. [NEW ELEMENT] (新規エレメント)追加し、[Field] (フィールド)リストで.Long[Operation] (オペレーション)リストで[List] (リスト)を選択します。
    6. 生成されたフィールドに、たとえば lonという名前を付けます。
  12. [Save] (保存)をクリックして設定を保存します。
  13. をクリックして、パイプラインに[Field Selector] (フィルターセレクター)プロセッサーを追加します。設定パネルが開きます。
  14. プロセッサーに意味のある名前を付けます。

    例え

    select latest position
  15. [SELECTORS] (セレクター)エリアで以下のようにします:
    1. idフィールドを同じ場所に置いたまま選択する場合は、[Output] (出力)リストにid、そして[Input] (入力)リストに.idとそれぞれ入力します。
    2. [NEW ELEMENT] (新規エレメント)を追加し、[Output] (出力)リストにairlines、そして[Input] (入力)リストに.Opとそれぞれ入力して、[Op]フィールドを選択して名前を変更します。
    3. dateフィールドを同じ場所に配置したまま選択する場合は、[NEW ELEMENT] (新規エレメント)を追加し、[Output] (出力)リストにdate、そして[Inputh] (入力)リストに.dateとそれぞれ入力します。
    4. 元の場所のlatフィールドを選択してスキーマの下位レベルに移動させる場合は、[NEW ELEMENT] (新規エレメント)を追加し、[Output] (出力)リストにlat、そして[Input] (入力)リストに.lat[-1]とそれぞれ入力します。
    5. 元の場所のlonフィールドを選択してスキーマの下位レベルに移動させる場合は、[NEW ELEMENT] (新規エレメント)を追加し、[Output] (出力)リストにlon、そして[Input] (入力)リストに.lon[-1]とそれぞれ入力します。

      このエリアではavpath構文を使用できます。

  16. [Save] (保存)をクリックして設定を保存します。
  17. をクリックして、パイプラインに[Python ] (Python行)プロセッサーを追加します。[Configuration] (運用設定)パネルが開きます。
  18. プロセッサーに意味のある名前を付けます。

    例え

    ジオハッシュの計算
  19. [Flat map] (フラットマップ)リストで[Flat map] (フラットマップ)を選択します。
  20. [Python code] (Pythonコード)エリアに、以下を入力します。
    def encode(latitude, longitude, precision=12):
      
        __base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
        __decodemap = { }
        for i in range(len(__base32)):
            __decodemap[__base32[i]] = i
        del i
    
        lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
        geohash = []
        bits = [ 16, 8, 4, 2, 1 ]
        bit = 0
        ch = 0
        even = True
        while len(geohash) < precision:
            if even:
                mid = (lon_interval[0] + lon_interval[1]) / 2
                if longitude > mid:
                    ch |= bits[bit]
                    lon_interval = (mid, lon_interval[1])
                else:
                    lon_interval = (lon_interval[0], mid)
            else:
                mid = (lat_interval[0] + lat_interval[1]) / 2
                if latitude > mid:
                    ch |= bits[bit]
                    lat_interval = (mid, lat_interval[1])
                else:
                    lat_interval = (lat_interval[0], mid)
            even = not even
            if bit < 4:
                bit += 1
            else:
                geohash += __base32[ch]
                bit = 0
                ch = 0
        return ''.join(geohash)
    
    output = json.loads("{}")
    output['id'] = input['id']
    output['airlines'] = input['airlines']
    output['date'] = input['date']
    output['location'] = encode(input['lat'], input['lon'])
    
    output.append(output)

    このコードを使用すると、ジオハッシュ(緯度と経度から得られる文字列表示の地理的位置)を計算できます。

  21. [Save] (保存)をクリックして設定を保存します。
  22. [ADD DESTINATION] (デスティネーションの追加)アイテムをクリックし、処理済みデータを保存するデータセットを選択します。
    必要であれば名前を変更します。
  23. (オプション) [Python]プロセッサーのプレビューを表示してデータをプレビューします。
  24. Talend Cloud Pipeline Designerの上部のツールバーで、リストから実行プロファイルを選択します(詳細は実行プロファイルを参照してください)。
  25. 実行アイコンをクリックしてパイプラインを実行します。

タスクの結果

ストリーミングパイプラインは実行中で、強制終了するまで実行されます。航空機のデータが変更され、計算したジオハッシュ情報が、指定したターゲットシステムに送信されます。