Traiter des données de streaming d'avions - Cloud

Guide des processeurs de Talend Cloud Pipeline Designer

Version
Cloud
Language
Français (France)
Product
Talend Cloud
Module
Talend Pipeline Designer
Content
Création et développement > Création de Pipelines

Avant de commencer

  • Vous avez précédemment créé une connexion au système stockant vos données source.

  • Vous avez précédemment ajouté le jeu de données contenant vos données source.

    Ici, les données de streaming des avions comprennent l'identifiant des avions, leur position et la date et l'heure.

    Pour vous aider à comprendre ce scénario, vous trouverez ci-dessous le schéma Avro des données de streaming utilisées dans ce scénario :
    {
      "type": "record",
      "name": "aircrafts",
      "fields": [
      {"name": "Id", "type": "int"},
      {"name": "PosTime", "type": "long"},
      {"name": "Lat", "type": "double"},
      {"name": "Long", "type": "double"},
      {"name": "Op", "type": "string"}
      ]
    }

    Id correspond aux identifiants des avions, PosTime à la date et l'heure de la position, Lat/Long à la latitude et à la longitude des avions, et Op aux compagnies aériennes.

  • Vous avez créé la connexion et le jeu de données associé qui contiendra les données traitées.

    Ici, une table MySQL.

Procédure

  1. Cliquez sur Add pipeline (Ajouter un pipeline) dans la page Pipelines. Votre nouveau pipeline s’ouvre.
  2. Donnez-lui un nom significatif.

    Exemple

    Process streaming aircraft data
  3. Cliquez sur ADD SOURCE pour ouvrir le panneau vous permettant de sélectionner vos données source, ici le topic sur les avions sur Kafka.
  4. Sélectionnez votre jeu de données et cliquez sur Select (Sélectionner) pour l'ajouter au pipeline.
    Renommez-le si nécessaire.
  5. Cliquez sur et ajoutez un processeur Window au pipeline. Le panneau de configuration s'ouvre.
  6. Donnez un nom significatif au processeur.

    Exemple

    5sec window
  7. Dans la zone CONFIGURATION :
    1. Activez l'option Use Window session.
    2. Renseignez 5000 comme durée de fenêtrage pour capturer les données toutes les 5 secondes.
  8. Cliquez sur et ajoutez un processeur Aggregate au pipeline. Le panneau de configuration s'ouvre.
  9. Donnez un nom significatif au processeur.

    Exemple

    group by aircraft
  10. Dans la zone GROUP BY, sélectionnez les champs à utiliser pour votre ensemble d'agrégation : ici, customer_id.
    1. Sélectionnez .Id dans la liste Field pour grouper les avions par identifiant.
    2. Ajoutez un nouvel élément (NEW ELEMENT) et sélectionnez .Op dans la liste Field pour grouper les avions par compagnies aériennes.
  11. Dans la zone OPERATIONS :
    1. Sélectionnez .PosTime dans la liste Field et Maximum dans la liste Operation.
    2. Nommez le champ généré (Output field), date par exemple.
    3. Ajoutez un nouvel élément (NEW ELEMENT), sélectionnez .Lat dans la liste Field et List dans la liste Operation.
    4. Nommez le champ généré, lat par exemple.
    5. Ajoutez un nouvel élément (NEW ELEMENT), sélectionnez .Long dans la liste Field et List dans la liste Operation.
    6. Nommez le champ généré, lon par exemple.
  12. Cliquez sur Save (Sauvegarder) pour sauvegarder votre configuration.
  13. Cliquez sur et ajoutez un processeur Field Selector au pipeline. Le panneau de configuration s'ouvre.
  14. Donnez un nom significatif au processeur.

    Exemple

    select latest position
  15. Dans la zone SELECTORS :
    1. Saisissez id dans la liste Output et .id dans la liste Input, puisque vous souhaitez sélectionner le champ id tout en le conservant au même emplacement.
    2. Ajoutez un nouvel élément (NEW ELEMENT) et saisissez airlines dans la liste Output et .Op dans la liste Input, puisque vous souhaitez sélectionner et renommer le champ Op.
    3. Ajoutez un nouvel élément (NEW ELEMENT) et saisissez date dans la liste Output et .date dans la liste Input, puisque vous souhaitez sélectionner le champ date tout en le conservant au même emplacement.
    4. Ajoutez un nouvel élément (NEW ELEMENT) et saisissez lat dans la liste Output et .lat[-1] dans la liste Input, puisque vous souhaitez sélectionner le champ lat du premier emplacement et le déplacer au niveau supérieur du schéma.
    5. Ajoutez un nouvel élément (NEW ELEMENT) et saisissez lon dans la liste Output et .lon[-1] dans la liste Input, puisque vous souhaitez sélectionner le champ lon du premier emplacement et le déplacer au niveau supérieur du schéma.

      Vous pouvez utiliser la syntaxe avpath dans cette zone.

  16. Cliquez sur Save (Sauvegarder) pour sauvegarder votre configuration.
  17. Cliquez sur et ajoutez un processeur Python au pipeline. Le panneau de Configuration s’affiche.
  18. Donnez un nom significatif au processeur.

    Exemple

    compute geohash
  19. Sélectionnez Flat map dans la liste Flat map.
  20. Dans la zone Python code, saisissez  :
    def encode(latitude, longitude, precision=12):
      
        __base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
        __decodemap = { }
        for i in range(len(__base32)):
            __decodemap[__base32[i]] = i
        del i
    
        lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
        geohash = []
        bits = [ 16, 8, 4, 2, 1 ]
        bit = 0
        ch = 0
        even = True
        while len(geohash) < precision:
            if even:
                mid = (lon_interval[0] + lon_interval[1]) / 2
                if longitude > mid:
                    ch |= bits[bit]
                    lon_interval = (mid, lon_interval[1])
                else:
                    lon_interval = (lon_interval[0], mid)
            else:
                mid = (lat_interval[0] + lat_interval[1]) / 2
                if latitude > mid:
                    ch |= bits[bit]
                    lat_interval = (mid, lat_interval[1])
                else:
                    lat_interval = (lat_interval[0], mid)
            even = not even
            if bit < 4:
                bit += 1
            else:
                geohash += __base32[ch]
                bit = 0
                ch = 0
        return ''.join(geohash)
    
    output = json.loads("{}")
    output['id'] = input['id']
    output['airlines'] = input['airlines']
    output['date'] = input['date']
    output['location'] = encode(input['lat'], input['lon'])
    
    output.append(output)

    Ce code vous permet de calculer les geohashes (chaînes de caractères indiquant la position géographique résultant de la latitude et de la longitude).

  21. Cliquez sur Save (Sauvegarder) pour sauvegarder votre configuration.
  22. Cliquez sur l'élément ADD DESTINATION et sélectionnez le jeu de données qui contiendra les données traitées.
    Renommez-le si nécessaire.
  23. (Facultatif) Examinez la prévisualisation du processeur Python afin de prévisualiser vos données.
  24. Dans la barre d'outils en haut de Talend Cloud Pipeline Designer, sélectionnez votre profil d'exécution dans la liste (pour plus d'informations, consultez Run profiles).
  25. Cliquez sur l'icône d'exécution pour exécuter votre pipeline.

Résultats

Votre pipeline de streaming est en cours d'exécution et sera exécuté jusqu'à ce que vous l'arrêtiez. Les données des avions sont modifiées et les informations des geohashes calculés sont envoyés au système cible que vous avez spécifié.