Avant de commencer
Vous avez précédemment créé une connexion au système stockant vos données source.
Vous avez précédemment ajouté le jeu de données contenant vos données source.
Ici, les données de streaming des avions comprennent l'identifiant des avions, leur position et la date et l'heure.
Pour vous aider à comprendre ce scénario, vous trouverez ci-dessous le schéma Avro des données de streaming utilisées dans ce scénario :
{
"type": "record",
"name": "aircrafts",
"fields": [
{"name": "Id", "type": "int"},
{"name": "PosTime", "type": "long"},
{"name": "Lat", "type": "double"},
{"name": "Long", "type": "double"},
{"name": "Op", "type": "string"}
]
}
où Id correspond aux identifiants des avions, PosTime à la date et l'heure de la position, Lat/Long à la latitude et à la longitude des avions, et Op aux compagnies aériennes.
Vous avez créé la connexion et le jeu de données associé qui contiendra les données traitées.
Ici, une table MySQL.
Procédure
-
Cliquez sur ADD PIPELINE dans la page Pipelines. Votre nouveau pipeline s’ouvre.
- Donnez-lui un nom significatif.
Exemple
Process streaming aircraft data
- Cliquez sur ADD SOURCE pour ouvrir le panneau vous permettant de sélectionner vos données source, ici le topic sur les avions sur Kafka.
-
Sélectionnez votre jeu de données et cliquez sur SELECT pour l’ajouter au pipeline.
Renommez-le si nécessaire.
- Cliquez sur
et ajoutez un processeur Window au pipeline. Le panneau de configuration s'ouvre.
- Donnez un nom significatif au processeur.
Exemple
5sec window
- Dans la zone CONFIGURATION :
- Activez l'option Use Window session.
- Renseignez 5000 comme durée de fenêtrage pour capturer les données toutes les 5 secondes.
- Cliquez sur
et ajoutez un processeur Aggregate au pipeline. Le panneau de configuration s'ouvre.
- Donnez un nom significatif au processeur.
Exemple
group by aircraft
- Dans la zone GROUP BY, sélectionnez les champs à utiliser pour votre ensemble d'agrégation : ici, customer_id.
- Sélectionnez .Id dans la liste Field pour grouper les avions par identifiant.
- Ajoutez un nouvel élément (NEW ELEMENT) et sélectionnez .Op dans la liste Field pour grouper les avions par compagnies aériennes.
- Dans la zone OPERATIONS :
- Sélectionnez .PosTime dans la liste Field et Maximum dans la liste Operation.
- Nommez le champ généré (Output field), date par exemple.
- Ajoutez un nouvel élément (NEW ELEMENT), sélectionnez .Lat dans la liste Field et List dans la liste Operation.
- Nommez le champ généré, lat par exemple.
- Ajoutez un nouvel élément (NEW ELEMENT), sélectionnez .Long dans la liste Field et List dans la liste Operation.
- Nommez le champ généré, lon par exemple.
-
Cliquez sur SAVE pour sauvegarder votre configuration.
- Cliquez sur
et ajoutez un processeur Field Selector au pipeline. Le panneau de configuration s'ouvre.
- Donnez un nom significatif au processeur.
Exemple
select latest position
- Dans la zone SELECTORS :
- Saisissez id dans la liste Output et .id dans la liste Input, puisque vous souhaitez sélectionner le champ
id
tout en le conservant au même emplacement.
- Ajoutez un nouvel élément (NEW ELEMENT) et saisissez airlines dans la liste Output et .Op dans la liste Input, puisque vous souhaitez sélectionner et renommer le champ
Op
.
- Ajoutez un nouvel élément (NEW ELEMENT) et saisissez date dans la liste Output et .date dans la liste Input, puisque vous souhaitez sélectionner le champ
date
tout en le conservant au même emplacement.
- Ajoutez un nouvel élément (NEW ELEMENT) et saisissez lat dans la liste Output et .lat[-1] dans la liste Input, puisque vous souhaitez sélectionner le champ
lat
du premier emplacement et le déplacer au niveau supérieur du schéma.
- Ajoutez un nouvel élément (NEW ELEMENT) et saisissez lon dans la liste Output et .lon[-1] dans la liste Input, puisque vous souhaitez sélectionner le champ
lon
du premier emplacement et le déplacer au niveau supérieur du schéma.Vous pouvez utiliser la syntaxe avpath dans cette zone.
-
Cliquez sur SAVE pour sauvegarder votre configuration.
- Cliquez sur
et ajoutez un processeur Python au pipeline. Le panneau de Configuration s’affiche.
- Donnez un nom significatif au processeur.
Exemple
compute geohash
- Sélectionnez Flat map dans la liste Flat map.
- Dans la zone Python code, saisissez :
def encode(latitude, longitude, precision=12):
__base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
__decodemap = { }
for i in range(len(__base32)):
__decodemap[__base32[i]] = i
del i
lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
geohash = []
bits = [ 16, 8, 4, 2, 1 ]
bit = 0
ch = 0
even = True
while len(geohash) < precision:
if even:
mid = (lon_interval[0] + lon_interval[1]) / 2
if longitude > mid:
ch |= bits[bit]
lon_interval = (mid, lon_interval[1])
else:
lon_interval = (lon_interval[0], mid)
else:
mid = (lat_interval[0] + lat_interval[1]) / 2
if latitude > mid:
ch |= bits[bit]
lat_interval = (mid, lat_interval[1])
else:
lat_interval = (lat_interval[0], mid)
even = not even
if bit < 4:
bit += 1
else:
geohash += __base32[ch]
bit = 0
ch = 0
return ''.join(geohash)
output = json.loads("{}")
output['id'] = input['id']
output['airlines'] = input['airlines']
output['date'] = input['date']
output['location'] = encode(input['lat'], input['lon'])
output.append(output)
Ce code vous permet de calculer les geohashes (chaînes de caractères indiquant la position géographique résultant de la latitude et de la longitude).
-
Cliquez sur SAVE pour sauvegarder votre configuration.
- Cliquez sur l'élément ADD DESTINATION et sélectionnez le jeu de données qui contiendra les données traitées.
Renommez-le si nécessaire.
- (Facultatif) Examinez la prévisualisation du processeur Python afin de prévisualiser vos données.
-
Dans la barre d'outils en haut de Talend Cloud Pipeline Designer, sélectionnez votre profil d'exécution dans la liste (pour plus d'informations, consultez Run profiles).
-
Cliquez sur l'icône d'exécution
pour exécuter votre pipeline.
Résultats
Votre pipeline de streaming est en cours d'exécution et sera exécuté jusqu'à ce que vous l'arrêtiez. Les données des avions sont modifiées et les informations des geohashes calculés sont envoyés au système cible que vous avez spécifié.