Vous trouverez ci-dessous un exemple de code pour un consommateur Java de Kafka, qui lit des tâches résolues depuis un sujet et envoie des notifications dans une chaîne Slack en particulier :
package myPackage;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.net.*;
import java.util.Arrays;
import java.util.Properties;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public final class SimpleConsumer {
private static final String GROUP_ID = "externalGroupForTest";
private static final String KAFKA_URL = "localhost:9092";
private static final String AUTO_COMMIT_INTERVAL = "1000";
private static final String SESSION_TIMEOUT = "30000";
private static final String TOPIC_NAME = "data-history";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static final String TASK_RESOURCE_TYPE = "org.talend.datasteward.tasks.model.Task";
public static final String CURRENT_STATE = "currentState";
public static final String TARGET_STATE = "Resolved";
public static final String TRANSITION_ACTION = "transition";
private static final String SLACK_URL = "https://hooks.slack.com/services/xxxx/xxxx/xxxxx
";
private static final String SLACK_USER_NAME = "TDS Resolved tasks bot";
public static void main(String[] args) throws Exception {
// Kafka consumer configuration
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_URL);
props.put("group.id", GROUP_ID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", AUTO_COMMIT_INTERVAL);
props.put("session.timeout.ms", SESSION_TIMEOUT);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
// Kafka consumer subscription
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// Start listening
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// Get data event
DataEvent dataEvent = objectMapper.readValue(record.value(), DataEvent.class);
// Filter transition to Resolved state events
if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent)) {
// Consume message, here is an example showing how to send a slack notification of the event
// 1. Extract task id, previous and new state
FieldModification fieldModification = dataEvent.getDetail().stream().filter(d -> "currentState".equals(d.getPath())).findFirst().get();
// 2. Build message
String messageBody = "The state of task *" + dataEvent.getResourceId() + "* has been updated from *"
+ fieldModification.getPreviousValue() + "* to *" + fieldModification.getNewValue() + "*.";
JsonObject message = new JsonObject();
message.addProperty("username", SLACK_USER_NAME);
message.addProperty("text", messageBody);
message.addProperty("unfurl_media", false);
message.addProperty("unfurl_links", false);
message.addProperty("link_names", false);
// 3. Send message
sendSlackNotification(message);
}
}
}
}
private static boolean isTargetStateResolved(DataEvent x) {
return x.getDetail().stream()
.anyMatch(detail -> CURRENT_STATE.equals(detail.getPath()) && TARGET_STATE.equals(detail.getNewValue()));
}
private static boolean isTransitionEvent(DataEvent x) {
return TRANSITION_ACTION.equals(x.getAction());
}
private static boolean isTaskEvent(DataEvent x) {
return TASK_RESOURCE_TYPE.equals(x.getResourceType());
}
private static String sendSlackNotification(JsonObject msg) throws Exception {
HttpURLConnection connection = null;
try {
// Create connection
final URL url = new URL(SLACK_URL);
connection = (HttpURLConnection) url.openConnection(Proxy.NO_PROXY);
connection.setRequestMethod("POST");
connection.setUseCaches(false);
connection.setDoInput(true);
connection.setDoOutput(true);
final String payload = "payload=" + URLEncoder.encode(msg.toString(), "UTF-8");
// Send request
final DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
wr.writeBytes(payload);
wr.flush();
wr.close();
// Get Response
final InputStream is = connection.getInputStream();
final BufferedReader rd = new BufferedReader(new InputStreamReader(is));
String line;
StringBuilder response = new StringBuilder();
while ((line = rd.readLine()) != null) {
response.append(line);
response.append('\n');
}
rd.close();
return response.toString();
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
}