始める前に
ソースデータを保管するシステムへの接続が作成済みであること。
ソースデータを保管するデータセットが追加済みであること。
ここでは、航空機のID、位置、タイムスタンプなどの航空機データをストリーミングします。
このシナリオを理解するために、このシナリオで使用されるストリーミングデータのAVROスキーマを次に示します:
{
"type": "record",
"name": "aircrafts",
"fields": [
{"name": "Id", "type": "int"},
{"name": "PosTime", "type": "long"},
{"name": "Lat", "type": "double"},
{"name": "Long", "type": "double"},
{"name": "Op", "type": "string"}
]
}
Idが航空機の識別子に対応している場合、PosTimeは位置のタイムスタンプに対応し、Lat/Longは航空機の緯度/経度に対応し、Opは航空会社に対応しています。
接続および処理済みデータを保管する関連データセットも作成済みであること。
ここでは、MySQLテーブルを使用します。
手順
-
[Pipelines] (パイプライン)ページで[ADD PIPELINE] (パイプラインの追加)をクリックします。新しいパイプラインが開きます。
- パイプラインに意味のある名前を付けます。
例え
ストリーミング航空機データの処理
- [ADD SOURCE] (ソースの追加)をクリックしてパネルを開くと、ここでは、kafkaの航空機トピックでソースデータを選択できます。
-
データセットを選択し、[SELECT] (選択)をクリックしてパイプラインに追加します。
必要であれば名前を変更します。
をクリックしてパイプラインにWindowプロセッサーを追加します。設定パネルが開きます。
- プロセッサーに意味のある名前を付けます。
例え
5sec window
- [CONFIGURATION] (設定)エリアで以下の操作を行います。
- [Use Window session] (ウィンドウセッションを使用)トグルを有効にします。
- 5秒ごとにデータをキャプチャするために、ウィンドウ継続時間として5000を入力します。
をクリックして、パイプラインにAggregateプロセッサーを追加します。設定パネルが開きます。
- プロセッサーに意味のある名前を付けます。
例え
group by aircraft
- [GROUP BY] (分類基準)エリアで、集計セットに使用するフィールドを選択します: ここでは、.customerIdになります。
- 航空機識別子でグループ化するには、[Field] (フィールド)リストで.Idを選択します。
- [NEW ELEMENT] (新規エレメント) 追加し、[Field] (フィールド)リストで.Opを選択して航空会社ごとにグループ化します。
- [OPERATIONS] (操作)エリアで次の操作を行います。
- [Field] (フィールド)リストで.PosTime、[Operation] (操作)リストでMaximumを選択します。
- 生成されたフィールド([Output field] (出力フィールド))に、たとえばdateという名前を付けます。
- [NEW ELEMENT] (新規エレメント)追加し、[Field] (フィールド)リストで.Lat、[Operation] (操作)リストでListを選択します。
- 生成されたフィールドに、たとえばlatという名前を付けます。
- [NEW ELEMENT] (新規エレメント)追加し、[Field] (フィールド)リストで.Long、[Operation] (操作)リストで[List] (リスト)を選択します。
- 生成されたフィールドに、たとえば lonという名前を付けます。
-
[Save] (保存)をクリックして設定を保存します。
をクリックし、パイプラインにField Selectorプロセッサーを追加します。設定パネルが開きます。
- プロセッサーに意味のある名前を付けます。
例え
select latest position
- [SELECTORS] (セレクター)エリアで次の操作を行います。
id
フィールドを同じ場所に置いたまま選択する場合は、[Output] (出力)リストにid、そして[Input] (入力)リストに.idとそれぞれ入力します。
- [NEW ELEMENT] (新規エレメント)を追加し、[Output] (出力)リストにairlines、そして[Input] (入力)リストに.Opとそれぞれ入力して、
[Op]
フィールドを選択して名前を変更します。
date
フィールドを同じ場所に配置したまま選択する場合は、[NEW ELEMENT] (新規エレメント)を追加し、[Output] (出力)リストにdate、そして[Inputh] (入力)リストに.dateとそれぞれ入力します。
- 元の場所の
lat
フィールドを選択してスキーマの下位レベルに移動させる場合は、[NEW ELEMENT] (新規エレメント)を追加し、[Output] (出力)リストにlat、そして[Input] (入力)リストに.lat[-1]とそれぞれ入力します。
- 元の場所の
lon
フィールドを選択してスキーマの下位レベルに移動させる場合は、[NEW ELEMENT] (新規エレメント)を追加し、[Output] (出力)リストにlon、そして[Input] (入力)リストに.lon[-1]とそれぞれ入力します。
-
[Save] (保存)をクリックして設定を保存します。
をクリックし、パイプラインにPythonプロセッサーを追加します。[Configuration] (設定)パネルが開きます。
- プロセッサーに意味のある名前を付けます。
例え
ジオハッシュの計算
- [Flat map] (フラットマップ)リストで[Flat map] (フラットマップ)を選択します。
- [Python code] (Pythonコード)エリアに、以下を入力します。
def encode(latitude, longitude, precision=12):
__base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
__decodemap = { }
for i in range(len(__base32)):
__decodemap[__base32[i]] = i
del i
lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
geohash = []
bits = [ 16, 8, 4, 2, 1 ]
bit = 0
ch = 0
even = True
while len(geohash) < precision:
if even:
mid = (lon_interval[0] + lon_interval[1]) / 2
if longitude > mid:
ch |= bits[bit]
lon_interval = (mid, lon_interval[1])
else:
lon_interval = (lon_interval[0], mid)
else:
mid = (lat_interval[0] + lat_interval[1]) / 2
if latitude > mid:
ch |= bits[bit]
lat_interval = (mid, lat_interval[1])
else:
lat_interval = (lat_interval[0], mid)
even = not even
if bit < 4:
bit += 1
else:
geohash += __base32[ch]
bit = 0
ch = 0
return ''.join(geohash)
output = json.loads("{}")
output['id'] = input['id']
output['airlines'] = input['airlines']
output['date'] = input['date']
output['location'] = encode(input['lat'], input['lon'])
output.append(output)
このコードを使用すると、ジオハッシュ(緯度と経度から得られる文字列表示の地理的位置)を計算できます。
-
[Save] (保存)をクリックして設定を保存します。
- [ADD DESTINATION] (デスティネーションの追加)アイテムをクリックし、処理済みデータを保存するデータセットを選択します。
必要であれば名前を変更します。
- (オプション) Pythonプロセッサーのプレビューを表示してデータをプレビューします。
-
Talend Cloud Pipeline Designerの上部のツールバーで、リストから実行プロファイルを選択します(詳細は実行プロファイルを参照してください)。
-
実行アイコンをクリックしてパイプラインを実行します。
タスクの結果
ストリーミングパイプラインは実行中となり、強制終了するまで実行されます。航空機のデータが変更され、計算したジオハッシュ情報が、指定したターゲットシステムに送信されます。