Processing streaming aircraft data - Cloud

Talend Cloud Pipeline Designer Processors Guide

author
Talend Documentation Team
EnrichVersion
Cloud
EnrichProdName
Talend Cloud
task
Design and Development > Designing Pipelines
EnrichPlatform
Talend Pipeline Designer

Before you begin

  • You have previously created a connection to the system storing your source data.

  • You have previously added the dataset holding your source data.

    Here, streaming aircraft data including aircraft IDs, position and timestamp.

    To help you understand this scenario, here is the AVRO schema of the streaming data used in this scenario:
    {
      "type": "record",
      "name": "aircrafts",
      "fields": [
      {"name": "Id", "type": "int"},
      {"name": "PosTime", "type": "long"},
      {"name": "Lat", "type": "double"},
      {"name": "Long", "type": "double"},
      {"name": "Op", "type": "string"}
      ]
    }

    where Id corresponds to Aircraft identifiers, PosTime corresponds to the Timestamp of the position, Lat/Long correspond to the aircraft latitude/longitude and Op corresponds to Airline companies.

  • You also have created the connection and the related dataset that will hold the processed data.

    Here, a MySQL table.

Procedure

  1. Click ADD PIPELINE on the PIPELINES page. Your new pipeline opens.
  2. Give the pipeline a meaningful name.

    Example

    Process streaming aircraft data
  3. Click ADD SOURCE to open the panel allowing you to select your source data, here the aircraft topic on kafka.
  4. Select your dataset and click SELECT DATASET in order to add it to the pipeline.
    Rename it if needed.
  5. Click and add a Window processor to the pipeline. The configuration panel opens.
  6. Give a meaningful name to the processor.

    Example

    5sec window
  7. In the CONFIGURATION area:
    1. Enable the Use Window session toggle.
    2. Type in 5000 as the window duration in order to capture data every 5 seconds.
  8. Click and add an Aggregate processor to the pipeline. The configuration panel opens.
  9. Give a meaningful name to the processor.

    Example

    group by aircraft
  10. In the GROUP BY area, select the fields you want to use for your aggregation set: here .customerId.
    1. Select .Id in the Field list to group by aircraft identifiers.
    2. Add a NEW ELEMENT and select .Op in the Field list to group by airline companies.
  11. In the OPERATIONS area:
    1. Select .PosTime in the Field list and Maximum in the Operation list.
    2. Name the generated field (Output field), date for example.
    3. Add a NEW ELEMENT, select .Lat in the Field list and List in the Operation list.
    4. Name the generated field, lat for example.
    5. Add a NEW ELEMENT, select .Long in the Field list and List in the Operation list.
    6. Name the generated field, lon for example.
  12. Click SAVE to save your configuration.
  13. Click and add a Field Selector processor to the pipeline. The configuration panel opens.
  14. Give a meaningful name to the processor.

    Example

    select latest position
  15. In the SELECTORS area:
    1. Enter id in the Field name list and .id in the Path list, as you want to select the id field while keeping it at the same location.
    2. Add a NEW ELEMENT and enter airlines in the Field name list and .Op in the Path list, as you want to select and rename the Op field.
    3. Add a NEW ELEMENT and enter date in the Field name list and .date in the Path list, as you want to select the date field while keeping it at the same location.
    4. Add a NEW ELEMENT and enter lat in the Field name list and type .lat[-1] in the Path list, as you want to select the lat field of the original location and move it to a lower level level in the schema.
    5. Add a NEW ELEMENT and enter lon in the Field name list and type .lon[-1] in the Path list, as you want to select the lon field of the original location and move it to a lower level level in the schema.

      You can use the avpath syntax in this area.

  16. Click SAVE to save your configuration.
  17. Click and add a Python processor to the pipeline. The Configuration panel opens.
  18. Give a meaningful name to the processor.

    Example

    compute geohash
  19. Select Flat map in the Flat Map list.
  20. In the Python code area, type in:
    def encode(latitude, longitude, precision=12):
      
        __base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
        __decodemap = { }
        for i in range(len(__base32)):
            __decodemap[__base32[i]] = i
        del i
    
        lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
        geohash = []
        bits = [ 16, 8, 4, 2, 1 ]
        bit = 0
        ch = 0
        even = True
        while len(geohash) < precision:
            if even:
                mid = (lon_interval[0] + lon_interval[1]) / 2
                if longitude > mid:
                    ch |= bits[bit]
                    lon_interval = (mid, lon_interval[1])
                else:
                    lon_interval = (lon_interval[0], mid)
            else:
                mid = (lat_interval[0] + lat_interval[1]) / 2
                if latitude > mid:
                    ch |= bits[bit]
                    lat_interval = (mid, lat_interval[1])
                else:
                    lat_interval = (lat_interval[0], mid)
            even = not even
            if bit < 4:
                bit += 1
            else:
                geohash += __base32[ch]
                bit = 0
                ch = 0
        return ''.join(geohash)
    
    output = json.loads("{}")
    output['id'] = input['id']
    output['airlines'] = input['airlines']
    output['date'] = input['date']
    output['location'] = encode(input['lat'], input['lon'])
    
    outputList.append(output)

    This code allows you to calculate Geohashes (a string indication geographic location resulting from the latitude and longitude).

  21. Click SAVE to save your configuration.
  22. Click the ADD DESTINATION item and select the dataset that will hold your processed data.
    Rename it if needed.
  23. (Optional) Click the top preview icon after the Python processor to preview your data.
  24. On the top toolbar of Talend Cloud Pipeline Designer, select your run profile in the list (for more information, see Run profiles).
  25. Click the run icon to run your pipeline.

Results

Your streaming pipeline is being executed and will run until you decide to terminate it. The aircraft data is modified and the calculated geohash information is sent to the target system you have indicated.