Traiter des données de streaming d'avions - Cloud

Guide des processeurs de Talend Cloud Pipeline Designer

Version
Cloud
Language
Français
Product
Talend Cloud
Module
Talend Pipeline Designer
Content
Création et développement > Création de Pipelines
Last publication date
2024-02-27

Un pipeline avec une source Kafka, un processeur Window, un processeur Aggregate, un processeur Filter selector, un processeur Python 3 et une destination MySQL.

Avant de commencer

  • Vous avez précédemment créé une connexion au système stockant vos données source.

  • Vous avez précédemment ajouté le jeu de données contenant vos données source.

    Ici, les données de streaming des avions comprennent l'identifiant des avions, leur position et la date et l'heure.

    Pour vous aider à comprendre ce scénario, vous trouverez ci-dessous le schéma Avro des données de streaming utilisées dans ce scénario :
    {
      "type": "record",
      "name": "aircrafts",
      "fields": [
      {"name": "Id", "type": "int"},
      {"name": "PosTime", "type": "long"},
      {"name": "Lat", "type": "double"},
      {"name": "Long", "type": "double"},
      {"name": "Op", "type": "string"}
      ]
    }

    Id correspond aux identifiants des avions, PosTime à la date et l'heure de la position, Lat/Long à la latitude et à la longitude des avions, et Op aux compagnies aériennes.

  • Vous avez créé la connexion et le jeu de données associé qui contiendra les données traitées.

    Ici, une table MySQL.

Procédure

  1. Cliquez sur Add pipeline (Ajouter un pipeline) dans la page Pipelines. Votre nouveau pipeline s’ouvre.
  2. Donnez-lui un nom significatif.

    Exemple

    Process streaming aircraft data
  3. Cliquez sur ADD SOURCE pour ouvrir le panneau vous permettant de sélectionner vos données source, ici le topic sur les avions sur Kafka.
    Aperçu d'un échantillon de données avec des enregistrements relatifs aux avions.
  4. Sélectionnez votre jeu de données et cliquez sur Select (Sélectionner) pour l'ajouter au pipeline.
    Renommez-le si nécessaire.
  5. Cliquez sur le bouton + et ajoutez un processeur Window (Fenêtre) au pipeline. Le panneau de configuration s'ouvre.
  6. Donnez un nom significatif au processeur.

    Exemple

    5sec window
  7. Dans la zone Configuration :
    1. Activez l'option Use Window session.
    2. Renseignez 5000 comme durée de fenêtrage pour capturer les données toutes les 5 secondes.
  8. Cliquez sur le bouton + et ajoutez un processeur Aggregate au pipeline. Le panneau de configuration s'ouvre.
  9. Donnez un nom significatif au processeur.

    Exemple

    group by aircraft
  10. Dans la zone Group by, sélectionnez les champs à utiliser pour votre jeu d'agrégation : ici, customer_id.
    1. Sélectionnez .Id dans la liste Field pour grouper les avions par identifiant.
    2. Ajoutez un nouvel élément (New element) et sélectionnez .Op dans la liste Field pour grouper les avions par compagnies aériennes.
  11. Dans la zone Operations :
    1. Sélectionnez .PosTime dans la liste Field et Maximum dans la liste Operation.
    2. Nommez le champ généré (Output field), date par exemple.
    3. Cliquez sur le bouton + pour ajouter un élément, sélectionnez .Lat dans la liste Field path et sélectionnez List dans la liste Operation.
    4. Nommez le champ généré, lat par exemple.
    5. Cliquez sur le bouton + pour ajouter un élément, sélectionnez .Long dans la liste Field path et sélectionnez List dans la liste Operation.
    6. Nommez le champ généré, lon par exemple.
  12. Cliquez sur Save (Sauvegarder) pour sauvegarder votre configuration.
  13. Cliquez sur le bouton + et ajoutez un processeur Field Selector au pipeline. Le panneau de configuration s'ouvre.
  14. Donnez un nom significatif au processeur.

    Exemple

    select latest position
  15. Dans la zone Selectors (Sélecteurs) du mode Advanced (Avancé) :
    1. Saisissez id dans la liste Output et .id dans la liste Input, puisque vous souhaitez sélectionner le champ id tout en le conservant au même emplacement.
    2. Cliquez sur le bouton + pour ajouter un élément, saisissez airlines dans la liste Output et .Op dans la liste Input, car vous souhaitez sélectionner et renommer le champ Op.
    3. Cliquez sur le bouton + pour ajouter un élément, saisissez date dans la liste Output et .date dans la liste Input, car vous souhaitez sélectionner le champ date en conservant son emplacement.
    4. Cliquez sur le bouton + pour ajouter un élément, saisissez lat dans la liste Output et saisissez .lat[-1] dans la liste Input, car vous souhaitez sélectionner le champ lat de l'emplacement original et le déplacer vers un niveau inférieur dans le schéma.
    5. Cliquez sur le bouton + pour ajouter un élément, saisissez lon dans la liste Output et saisissez .lon[-1] dans la liste Input, car vous souhaitez sélectionner le champ lon de l'emplacement original et le déplacer vers un niveau inférieur dans le schéma.

      Vous pouvez utiliser la syntaxe avpath dans cette zone.

  16. Cliquez sur Save (Sauvegarder) pour sauvegarder votre configuration.
  17. Cliquez sur le bouton + et ajoutez un processeur Python 3 au pipeline. Le panneau de configuration s'ouvre.
  18. Donnez un nom significatif au processeur.

    Exemple

    compute geohash
  19. Dans la zone Python 3 code, saisissez :
    def encode(latitude, longitude, precision=12):
      
        __base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
        __decodemap = { }
        for i in range(len(__base32)):
            __decodemap[__base32[i]] = i
        del i
    
        lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
        geohash = []
        bits = [ 16, 8, 4, 2, 1 ]
        bit = 0
        ch = 0
        even = True
        while len(geohash) < precision:
            if even:
                mid = (lon_interval[0] + lon_interval[1]) / 2
                if longitude > mid:
                    ch |= bits[bit]
                    lon_interval = (mid, lon_interval[1])
                else:
                    lon_interval = (lon_interval[0], mid)
            else:
                mid = (lat_interval[0] + lat_interval[1]) / 2
                if latitude > mid:
                    ch |= bits[bit]
                    lat_interval = (mid, lat_interval[1])
                else:
                    lat_interval = (lat_interval[0], mid)
            even = not even
            if bit < 4:
                bit += 1
            else:
                geohash += __base32[ch]
                bit = 0
                ch = 0
        return ''.join(geohash)
    
    output = json.loads("{}")
    output['id'] = input['id']
    output['airlines'] = input['airlines']
    output['date'] = input['date']
    output['location'] = encode(input['lat'], input['lon'])
    
    output.append(output)

    Ce code vous permet de calculer les geohashes (chaînes de caractères indiquant la position géographique résultant de la latitude et de la longitude).

  20. Cliquez sur Save (Sauvegarder) pour sauvegarder votre configuration.
  21. Cliquez sur ADD DESTINATION (AJOUTER UNE DESTINATION) et sélectionnez le jeu de données qui contiendra les données traitées.
    Renommez-le si nécessaire.
  22. (Facultatif) Examinez la prévisualisation du processeur Python 3 afin de prévisualiser vos données.
    Aperçu du processeur Python 3 après calcul des informations de geohash.
  23. Dans la barre d'outils en haut de Talend Cloud Pipeline Designer, cliquez sur le bouton Run (Exécuter) pour ouvrir le panneau vous permettant de sélectionner votre profil d'exécution.
  24. Sélectionnez dans la liste votre profil d'exécution (pour plus d'informations, consultez Profils d'exécution), puis cliquez sur Run (Exécuter) pour exécuter votre pipeline.

Résultats

Votre pipeline de streaming est en cours d'exécution et sera exécuté jusqu'à ce que vous l'arrêtiez. Les données des avions sont modifiées et les informations des geohashes calculés sont envoyés au système cible que vous avez spécifié.