Best Practice: Change Data Capture with Spark in Big Data

EnrichVersion
Cloud
6.4
6.3
6.2
6.1
6.0
5.6
EnrichProdName
Talend Real-Time Big Data Platform
Talend Data Fabric
Talend Big Data Platform
Talend Big Data
task
Data Governance > Third-party systems > Database components > Change Data Capture
Design and Development > Third-party systems > Database components > Change Data Capture
Data Quality and Preparation > Third-party systems > Database components > Change Data Capture
EnrichPlatform
Talend Studio

Best Practice: Change Data Capture with Spark in Big Data

In the Data driven world, companies who have access to data real time have a significant advantage on their competitors who do not. One of the things that prevents IT to provide real time data is the time it takes to extract data from legacy systems and load into the Analytical systems.

As such, loading all the data into these systems every time is not efficient use of the computing resources and it also takes longer to get the data to the users. This is where Change Data Capture (CDC) plays a significant role in integrating data faster into target systems.

What is Change Data Capture?

Change Data Capture (CDC) is an advanced technology for data replication and loading in order to keep multiple systems in sync with all the data changes. Through CDC, only changes in the source data will need to load to the target system. The changes in the source systems will need to be tracked and the right load strategy (insert, update, delete) will need to be applied to the target system. The benefits of using CDC are clear and proven. It saves computing resources and the data is provisioned quicker to the end users.

CDC in traditional databases

Talend provides CDC support for all the traditional relational databases. Talend uses a subscriber/publish architecture wherein publisher captures the change data and makes it available to the subscribers. The two main CDC modes are the Trigger mode and Redo/Archive logs mode. Almost all the relational databases use Trigger mode except for Oracle which uses Redo/Archive log mode for capturing changes in the source data.

For Trigger mode to work, the database needs to be already enabled with CDC. This is typically done by stored procedures within that database. For example, in SQL Server, CDC is enabled by executing sys.sp_cdc_enable_db.

Talend needs its own metadata for CDC to work. These are the SUBSCRIBER table, which tracks the tables for changes and the Change table which tracks the changes to the data in the table itself. This is primarily done by creating CDC Metadata in the repository and using the metadata in the Talend CDC components. Talend provides CDC components such as tOracleCDC, tAS400CDC, tDB2CDC, etc.

For more information about Change Data Capture, see the Talend Data Fabric User Guide.

CDC can also be achieved by storing the date and time of when the record was last updated. Then the filter can be applied to the extraction layer of the source system.

CDC with Spark in Big Data

This article shows a sample approach how to do CDC using Talend components.

CDC has same advantages in the big data world too. But the challenge with using CDC in Hadoop is that Hadoop is not ideal for data updates. Inserting data in Hadoop is simple in Hive but updates and delete are not. As Hadoop is a distributed system where data is stored is multiple nodes across the network, the performance overhead of updating a record is huge.

One of the ways to solve this issue is create Hive base or internal tables and Hive external tables and build Views on the top of them. The Base table will hold the all the data until the time new records are being loaded. The new changed records will be loaded into the External tables. Internal tables are typically used when the data in temporary and external tables are used when the data in the tables are used outside Hive.

Data Model

You will have two tables in MySQL:
  • employee_table: is the source table.
  • cdc_control: has the information on the table that is being tracked and the last time it was updated.
The source data is in employee_table.

Step 1: Loading the database table data into the Hive internal table

Procedure

  1. Create a standard Job to load the database table data into the Hive internal table employee.
  2. Use the tHiveCreateTable and tHiveLoad components to load the data into the Hive table.
  3. The Talend Big Data Batch Job is as follow:
    • The data are moved from the MySQL source table employee_table to the employee table in Hive.
    • The tHiveConfiguration_1 and tHDFSConfiguration_1 components set the Hive and HDFS configuration.
    • The tMysqlInput component reads the data from the MySQL database.
    • The tHiveOutput component loads the data into the Hive table.
  4. Run the Job.
    The following shows the data in the employee Hive table after this Job is run:

Step 2: Loading changes from the source database table into the Hive external table

This step reads only the changes from the source database table and loads them into the Hive external table employee_extnl.

Procedure

  1. The Big Data Batch Job is as follow:
    • The source table is filtered by the last updated timestamp which is maintained in the cdc_control table. This is done by using this SQL in the Where condition of the tmysqlInput component.

      where cdc.Table_Name='employee_table' and emp.`Record_DateTime`> cdc.Last_executed"

    • The tAggregateRow loads one row per run into the cdc_control table. It does an update else insert operation on the table. If a record for the table already exists, it will update the record with the run time of the Job.

      The runtime can be set by using the TalendDate.getCurrentDate() function.

    The following shows the data in the source employee_table table after new records are added:
  2. Run the Job.
    The following shows the data in the employee_extnl external Hive table after the Job is run:

Step 3: Creating and loading view into a final reporting table

In this step, a view created as on the top of the base table employee and the external table employee_extnl, so that only the latest data is shown. This view would need to be then loaded into a final reporting table which will be accessed by the subscribers of the data.

Procedure

Use the following SQL:
CREATE VIEW employee_view AS

SELECT t1.* FROM

 

(SELECT * FROM employee

    UNION ALL

    SELECT * FROM employee_extnl) t1

JOIN

    (SELECT employeeid, max(record_datetime) max_modified FROM

        (SELECT * FROM employee

        UNION ALL

        SELECT * FROM employee_extnl) t2

    GROUP BY employeeid) s

ON t1.employeeid = s.employeeid AND t1.record_datetime = s.max_modified;

The results from the view are as follow:

The view generates only the latest data.