コンシューマグループの設定 - 7.2

Talend Data Stewardshipユーザーガイド

author
Talend Documentation Team
EnrichVersion
7.2
EnrichProdName
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Real-Time Big Data Platform
task
データガバナンス > キャンペーンの管理
データガバナンス > タスクの割り当て
データガバナンス > データモデルの管理
データクオリティとプレパレーション > セマンティックタイプの管理
データクオリティとプレパレーション > タスク管理
管理と監視 > ユーザーの管理
EnrichPlatform
Talend Data Stewardship
data-historyトピックを購読するコンシューマグループを作成し、フィルターを使用してTalend Data Stewardship[Resolved] (解決済み)状態に移行したタスクに関するイベントのみを消費します。

始める前に

Talend Data Stewardshipが起動しています。

手順

  1. data-historyトピックを購読するためのJavaクラスを作成します:
    • GROUP_IDに、externalGroupForTestなどのコンシューマグループIDを追加します。

      Talend Data Stewardshipで使用したグループIDとは異なる固有の名前を付けて下さい。

    • KAFKA_URLに、Kafkaサーバーのアドレスを入力します。

      このアドレスは、<path_to_installation_folder>/kafka/config/consumer.propertiesにあるbootstrap.serversパラメーターの値と同じでなければなりません。

    • TOPIC_NAMETalend Data Stewardshipのデフォルトのトピックであるdata-historyを入力します。

      トピック名は、<path_to_installation_folder>/tds/apache-tomcat/conf/data-stewardship.propertiesにあるhistory.kafka.topicパラメーターの値と同じでなければなりません。

  2. 追跡するイベントの種類と条件を定義します。たとえば、次のような場合があります:
    • TASK_RESOURCE_TYPEに、org.talend.datasteward.tasks.model.Taskと入力してタスクを追跡します。
    • CURRENT_STATEに、currentStateと入力して、タスクの現在の状態を確認できるようにします。
    • TARGET_STATEResolvedと入力して、ステータスが[Resolved] (解決済み)と等しいことを確認します。
  3. タスクを[Resolved] (解決済み)状態に移行したときにのみイベントを送信するようにフィルタを定義します。次に例を示します:
    if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent))

    例え

    次の例は、トピックから解決されたタスクを読み取り、特定のSlackチャネルに通知を送信するKafka Javaコンシューマのサンプルコードを示しています:
    package myPackage;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.net.*;
    import java.util.Arrays;
    import java.util.Properties;
    
    import com.google.gson.JsonObject;
    
    import java.io.BufferedReader;
    import java.io.DataOutputStream;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.HttpURLConnection;
    import java.net.URL;
    
    public final class SimpleConsumer {
        private static final String GROUP_ID = "externalGroupForTest";
        private static final String KAFKA_URL = "localhost:9092";
        private static final String AUTO_COMMIT_INTERVAL = "1000";
        private static final String SESSION_TIMEOUT = "30000";
        private static final String TOPIC_NAME = "data-history";
    
        private static final ObjectMapper objectMapper = new ObjectMapper();
        public static final String TASK_RESOURCE_TYPE = "org.talend.datasteward.tasks.model.Task";
        public static final String CURRENT_STATE = "currentState";
        public static final String TARGET_STATE = "Resolved";
        public static final String TRANSITION_ACTION = "transition";
    
        private static final String SLACK_URL = "https://hooks.slack.com/services/xxxx/xxxx/xxxxx
    ";
        private static final String SLACK_USER_NAME = "TDS Resolved tasks bot";
    
        public static void main(String[] args) throws Exception {
            // Kafka consumer configuration
            Properties props = new Properties();
            props.put("bootstrap.servers", KAFKA_URL);
            props.put("group.id", GROUP_ID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", AUTO_COMMIT_INTERVAL);
            props.put("session.timeout.ms", SESSION_TIMEOUT);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
    
            // Kafka consumer subscription
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
    
            // Start listening
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    // Get data event
                    DataEvent dataEvent = objectMapper.readValue(record.value(), DataEvent.class);
    
                    // Filter transition to Resolved state events
                    if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent)) {
                        // Consume message, here is an example showing how to send a slack notification of the event
    
                        // 1. Extract task id, previous and new state
                        FieldModification fieldModification = dataEvent.getDetail().stream().filter(d -> "currentState".equals(d.getPath())).findFirst().get();
                        // 2. Build message
                        String messageBody = "The state of task *" + dataEvent.getResourceId() + "* has been updated from *"
                                + fieldModification.getPreviousValue() + "* to *" + fieldModification.getNewValue() + "*.";
                        JsonObject message = new JsonObject();
                        message.addProperty("username", SLACK_USER_NAME);
                        message.addProperty("text", messageBody);
                        message.addProperty("unfurl_media", false);
                        message.addProperty("unfurl_links", false);
                        message.addProperty("link_names", false);
    
                        // 3. Send message
                        sendSlackNotification(message);
                    }
                }
            }
        }
    
        private static boolean isTargetStateResolved(DataEvent x) {
           return x.getDetail().stream()
                    .anyMatch(detail -> CURRENT_STATE.equals(detail.getPath()) && TARGET_STATE.equals(detail.getNewValue()));
        }
    
        private static boolean isTransitionEvent(DataEvent x) {
            return TRANSITION_ACTION.equals(x.getAction());
        }
    
        private static boolean isTaskEvent(DataEvent x) {
            return TASK_RESOURCE_TYPE.equals(x.getResourceType());
        }
    
        private static String sendSlackNotification(JsonObject msg) throws Exception {
            HttpURLConnection connection = null;
            try {
                // Create connection
                final URL url = new URL(SLACK_URL);
                connection = (HttpURLConnection) url.openConnection(Proxy.NO_PROXY);
                connection.setRequestMethod("POST");
                connection.setUseCaches(false);
                connection.setDoInput(true);
                connection.setDoOutput(true);
    
                final String payload = "payload=" + URLEncoder.encode(msg.toString(), "UTF-8");
    
                // Send request
                final DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
                wr.writeBytes(payload);
                wr.flush();
                wr.close();
    
                // Get Response
                final InputStream is = connection.getInputStream();
                final BufferedReader rd = new BufferedReader(new InputStreamReader(is));
                String line;
                StringBuilder response = new StringBuilder();
                while ((line = rd.readLine()) != null) {
                    response.append(line);
                    response.append('\n');
                }
    
                rd.close();
                return response.toString();
            }  finally {
                if (connection != null) {
                    connection.disconnect();
                }
            }
        }
    }
  4. Kafkaイベントの通知をリアルタイムでSlackに受信するWebhookを使って統合を構築します。
  5. 変更を保存します。

タスクの結果

タスクステータスが[Resolved] (解決済み)に変更されるたびに、履歴イベントがトピックに送信され、通知メッセージがSlack内の指定されたチャネルに送信されます。