始める前に
-
ソースデータを保管するシステムへの接続が作成済みであること。
-
ソースデータを保管するデータセットが追加済みであること。
ここでは、航空機のID、位置、タイムスタンプなどの航空機データをストリーミングします。
このシナリオを理解するために、このシナリオで使用されるストリーミングデータのAVROスキーマを次に示します:
{
"type": "record",
"name": "aircrafts",
"fields": [
{"name": "Id", "type": "int"},
{"name": "PosTime", "type": "long"},
{"name": "Lat", "type": "double"},
{"name": "Long", "type": "double"},
{"name": "Op", "type": "string"}
]
}
Idが航空機の識別子に対応している場合、PosTimeは位置のタイムスタンプに対応し、Lat/Longは航空機の緯度/経度に対応し、Opは航空会社に対応しています。
-
接続および処理済みデータを保管する関連データセットも作成済みであること。
ここでは、MySQLテーブルを使用します。
手順
-
[Pipelines] (パイプライン)ページで[Add pipeline] (パイプラインを追加)をクリックします。新しいパイプラインが開きます。
-
パイプラインに意味のある名前を付けます。
例
ストリーミング航空機データの処理
-
[ADD SOURCE] (ソースを追加)をクリックしてパネルを開きます。このパネルで、ソースデータ(この場合はkafkaの航空機トピック)を選択できます。
-
データセットを選択し、[Select] (選択)をクリックしてパイプラインに追加できるようにします。
必要であれば名前を変更します。
-
をクリックし、パイプラインにWindowプロセッサーを追加します。設定パネルが開きます。
-
プロセッサーに意味のある名前を付けます。
例
5sec window
-
[Configuration] (設定)エリアで以下の操作を行います。
-
[Use Window session] (ウィンドウセッションを使用)トグルを有効にします。
-
ウィンドウ継続時間として5000を入力し、5秒ごとにデータをキャプチャーできるようにします。
-
をクリックし、パイプラインにAggregateプロセッサーを追加します。設定パネルが開きます。
-
プロセッサーに意味のある名前を付けます。
例
group by aircraft
-
[Group by] (グループ基準)エリアで、集計セットに使用するフィールドを選択します。ここでは、.customerIdになります。
-
航空機識別子でグルーピングするには、[Field] (フィールド)リストで.Idを選択します。
-
[New element] (新規エレメント) 追加し、[Field] (フィールド)リストで.Opを選択して航空会社ごとにグルーピングします。
-
[Operations] (操作)エリアで次の操作を行います。
-
[Field] (フィールド)リストで.PosTime、[Operation] (操作)リストでMaximumを選択します。
-
生成されたフィールド([Output field] (出力フィールド))に、たとえばdateという名前を付けます。
-
+記号をクリックして新しいエレメントを追加し、[Field] (フィールド)リストで.Latを、[Operation] (操作)リストで[List] (リスト)をそれぞれ入力します。
-
生成されたフィールドに、たとえばlatという名前を付けます。
-
+記号をクリックして新しいエレメントを追加し、[Field] (フィールド)リストで.Longを、[Operation] (操作)リストで[List] (リスト)をそれぞれ入力します。
-
生成されたフィールドに、たとえば lonという名前を付けます。
-
[Save] (保存)をクリックして設定を保存します。
-
をクリックし、パイプラインにField Selectorプロセッサーを追加します。設定パネルが開きます。
-
プロセッサーに意味のある名前を付けます。
例
select latest position
-
[Advanced] (詳細設定)モードの[Selectors] (セレクター)エリアで:
-
id
フィールドを同じ場所に置いたまま選択する場合は、[Output] (出力)リストにid、そして[Input] (入力)リストに.idとそれぞれ入力します。
-
Op
フィールドを選択して名前を変更する場合は、+記号をクリックして新しいエレメントを追加し、[Output] (出力)リストにairlinesと、[Input] (入力)リストに.Opとそれぞれ入力します。
-
date
フィールドを選択して同じ場所に保持しておく場合は、+記号をクリックして新しいエレメントを追加し、[Output] (出力)リストにdateと、[Input] (入力)リストに.dateとそれぞれ入力します。
-
最初の場所の
lat
フィールドを選択してスキーマのトップレベルに移動させる場合は、+記号をクリックして新しいエレメントを追加し、[Output] (出力)リストにlatと、[Input] (入力)リストに.lat[-1]とそれぞれ入力します。
-
最初の場所の
lon
フィールドを選択してスキーマのトップレベルに移動させる場合は、+記号をクリックして新しいエレメントを追加し、[Output] (出力)リストにlonと、[Input] (入力)リストに.lon[-1]とそれぞれ入力します。
-
[Save] (保存)をクリックして設定を保存します。
-
をクリックし、パイプラインにPython 3プロセッサーを追加します。設定パネルが開きます。
-
プロセッサーに意味のある名前を付けます。
例
ジオハッシュの計算
-
[Python 3 code] (Python 3コード)エリアに、以下を入力します:
def encode(latitude, longitude, precision=12):
__base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
__decodemap = { }
for i in range(len(__base32)):
__decodemap[__base32[i]] = i
del i
lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
geohash = []
bits = [ 16, 8, 4, 2, 1 ]
bit = 0
ch = 0
even = True
while len(geohash) < precision:
if even:
mid = (lon_interval[0] + lon_interval[1]) / 2
if longitude > mid:
ch |= bits[bit]
lon_interval = (mid, lon_interval[1])
else:
lon_interval = (lon_interval[0], mid)
else:
mid = (lat_interval[0] + lat_interval[1]) / 2
if latitude > mid:
ch |= bits[bit]
lat_interval = (mid, lat_interval[1])
else:
lat_interval = (lat_interval[0], mid)
even = not even
if bit < 4:
bit += 1
else:
geohash += __base32[ch]
bit = 0
ch = 0
return ''.join(geohash)
output = json.loads("{}")
output['id'] = input['id']
output['airlines'] = input['airlines']
output['date'] = input['date']
output['location'] = encode(input['lat'], input['lon'])
output.append(output)
このコードを使用すると、ジオハッシュ(緯度と経度から得られる文字列表示の地理的位置)を計算できます。
-
[Save] (保存)をクリックして設定を保存します。
-
[ADD DESTINATION] (デスティネーションを追加)をクリックし、処理済みデータを保持するデータセットを選択します。
必要であれば名前を変更します。
-
(オプション) Python 3プロセッサーのプレビューを表示してデータをプレビューします。
-
Talend Cloud Pipeline Designerの上部ツールバーで[Run] (実行)ボタンをクリックするとパネルが開き、実行プロファイルを選択できるようになります。
-
リストで実行プロファイルを選択し(詳細は実行プロファイルをご覧ください)、[Run] (実行)をクリックしてパイプラインを実行します。
タスクの結果
ストリーミングパイプラインは実行中となり、強制終了するまで実行されます。航空機のデータが変更され、計算したジオハッシュ情報が、指定したターゲットシステムに送信されます。