Developing a component using Talend Component Kit (Deprecated)

author
Talend Documentation Team
EnrichVersion
6.4
EnrichProdName
Talend Big Data Platform
Talend Big Data
Talend Data Integration
Talend Data Services Platform
Talend ESB
Talend Open Studio for Data Integration
Talend Open Studio for ESB
Talend Data Fabric
Talend Open Studio for MDM
Talend Real-Time Big Data Platform
Talend MDM Platform
Talend Data Management Platform
Talend Open Studio for Big Data
task
Design and Development > Designing Components
EnrichPlatform
Talend Studio

Developing a component using Talend Component Kit (Deprecated)

This tutorial is deprecated as a new Talend Component Kit framework has been introduced to the Studio from version 7.0 onwards. For more information about the new framework, see this new tutorial.

Talend Component Kit provides a scalable component development framework with a rich set of pre-defined engines, classes, methods and functions, to streamline your component development process and make your component easily portable across Talend products.

This framework is required to develop components that can obtain optimal performance in different Talend tools.

It is possible that breaking changes may occur in the Component API before it reaches its first major stable version. While Talend will do its best to limit such changes, you should bear this possibility in mind when developing your component.

In this tutorial, a component is developed, for demonstration purposes, to read data from a REST-based application, Google Fusion Tables.

The project presented in this tutorial is hosted here. You can download it using the following command:
git clone git@github.com:Talend/components.git --single-branch --branch developing_a_component_using_talend_component_kit --depth 1
If you are interested in the development history of this project, run the following command to fetch the whole commit history:
git clone git@github.com:Talend/components.git --single-branch --branch developing_a_component_using_talend_component_kit --depth 33
Prerequisites:
  1. Solid Java development knowledge.
  2. A Google account.

  3. Basic knowledge about Google Fusion Tables. You can follow this Google tutorial to become familiar with this Google application.

  4. Fusion Tables API in your Google Cloud Platform console to allow you obtain the API key.

  5. OAuth 2.0 credentials for Google Fusion Tables. These credentials are stored in a .json file.

Note that these operations are performed on the Google side.

You also need to analyze the application API from the following aspects:
  • What kind of data entities are accessible for processing? In Google Fusion Tables, the available data entities are: Table, Column, Template, Style and Row.

  • What operations you can perform on these entities and how to divide these operations among as many components as necessary? The actions on data allowed by Google Fusion Tables are: List, Get, Insert, Update and Delete.

  • What properties should be defined to support performing these operations, such as the connection to Google Fusion Tables and the authentication? You also need to avoid code duplication in the design of the properties.

Creating the skeleton of your component

Use a Maven archetype provided by Talend to automatically create the skeleton of a Talend component and update this skeleton to create your component.

Currently, only the archetype for source components (input components) is available.

Before you begin

Ensure the following is setup:
  • Java V8+ has been properly installed in your machine. But bear in mind that this Java version requirement is applied to component development only. If you test your component in a Talend product, please ensure the setup of the environment required by that product.
  • Maven V3+ has been properly installed in your machine.
  • A Java Integrated Development Environment has been properly set up such as Eclipse or IntelliJ.

Procedure

  1. Create a folder to store the resource files of the component you want to create.

    For example, C:/my_components.

  2. Check the API version of the components of your Talend Studio.
    1. Open the plugins folder of your Studio.
    2. Search for the keyword org.talend.components.api in this folder.

      The version of the org.talend.components.api.xxx.jar file is the version to be used in the following step. In this tutorial, this file is org.talend.components.api_0.19.0.jar and so the version to be used to 0.19.0.

  3. In the command-line interface of your machine, copy-paste the following command, change the archetype version to the one you obtained in the step above and execute it.

    In the tutorial, the archetype version to be used is 0.19.0.

    mvn org.apache.maven.plugins:maven-archetype-plugin:2.4:generate -DarchetypeGroupId=org.talend.components -DarchetypeArtifactId=input-component-archetypes -DarchetypeVersion=0.19.0 -DarchetypeRepository=https://artifacts-oss.talend.com/nexus/content/repositories/TalendOpenSourceRelease/

    The archetype to be used in this command is to develop an input component.

    If you copy-paste this command from a PDF file, this command could have been broken into several lines. Restore this command into a single line before executing it.

  4. In the command-line interface, the execution of the command guides you to define the archetype properties of the component to be developed.
    1. Enter n when you are asked whether you want to use the default component profile and press Enter. This means that you do not use the default profile and define a custom one.
    2. Enter the group ID you want to use for your component following the pattern of the example and press Enter.

      In this tutorial, enter org.talend.components, which is actually the same as the default one.

    3. Enter the artifact ID of your component following the pattern of the example and press Enter.

      In this tutorial, enter components-google as the component to be developed is expected to work with Goolge Fusion Tables.

    4. Enter the version of your component following the pattern of the example and press Enter.

      In this tutorial, directly press Enter without entering any custom version. Thus the default version 0.1.0-SNAPSHOT is used.

    5. Enter the resource package name of your component following the pattern of the example and press Enter.

      In this tutorial, enter org.talend.components.google.

    6. Enter the technical name of your component and press Enter. As this name is used as prefix of the classes to be used during the development, capitalize the initial letter of the name to follow the Java naming convention.

      In this tutorial, enter TGoogleFusionTableInput.

    7. Enter the package name of your component following the example and press Enter. This name must be all in lower case.

      In this tutorial, enter tgooglefusiontableinput.

  5. The whole profile you just defined is returned. Review it and once you do not want to change it further, enter Y and press Enter to confirm it and build the component skeleton. Otherwise, enter N and press Enter to restart the configuration process.

    When the build is done, in the My_components folder, the components-google folder is created. It contains a pom.xml file, the components-google-definition folder and the components-google-runtime folder.

    If you need more explanations about this archetype and the component artifacts it builds, see https://github.com/Talend/components/wiki/1.-Build-settings-for-new-components.

  6. In your command-line interface, go to this components-google folder and execute mvn clean install -l build.log to build the component with all default configurations to verify that all is right until this step.

    Known issue: the <classifier>bundle</classifier> line could be missing in the pom.xml file generated for the definition part. In this case, add manually this line between the <configuration> and the <instrcutions> tags of the maven-bundle-plugin artifact in this pom file.

    For details of this known issue, see https://jira.talendforge.org/browse/TCOMP-627.

Registering the component

Registering the component means to technically define the purpose and type of the component, such as a source (input) component that reads data from Google Fusion Tables in this tutorial.

Procedure

  1. Import the your component skeleton into Eclipse or IntelliJ. As the skeleton is hosted in the Maven project, you need to import this Maven project from its parent level, that is to say, from the directory in which the parent pom is stored.

    In this tutorial, this directory is in the C:/my_components/components-google directory.

    If you need further information about how to import a Maven project into your Eclipse or your IntelliJ, see Eclipse documentation or IntelliJ documentation accordingly.

    In this tutorial, Eclipse is used. Once the import is done, the classes to be updated are listed in the Package Explorer view.

  2. In the Package Explorer view, open class org.talend.components.google.TGoogleFusionTableInputFamilyDefinition in the components-google-definition folder and rename this class to GoogleFusionTableFamilyDefinition. The purpose of this renaming is to define a technical family that is generic enough to register all the components related to Google Fusion Tables you could create, including the current tGoogleFustionTableInput.
  3. Double click GoogleFustionTableFamilyDefinition.java to open it in the workspace of Eclipse and define the family name to be used to register tGoogleFustionTableInput. Note that this family name is only a technical name, which, therefore, is not displayed in the user interface in the Studio.
    In this example, replace
    public static final String NAME = "TGoogleFusionTableInput";
    with
    public static final String NAME = "GoogleFusionTable";
    .

    Thus GoogleFusionTable is defined as family name.

  4. In the same class, verify that this family contains the definition of the tGoogleFusionTableInput component.
    In this tutorial, the lines used to group the component definitions read:
    public GoogleFusionTableFamilyDefinition() {
            super(NAME, new TGoogleFusionTableInputDefinition());
            
            }
  5. Open class org.talend.components.google.tgooglefusiontableinput.TGoogleFusionTableInputDefinition, then in this class, register tGoogleFustionTableInput and declare general information of the component.
    1. Declare public static final String COMPONENT_NAME = "tGoogleFusionTableInput"; //$NON-NLS-1$ to replace the default one. This defines the component name to be displayed in the Studio.
    2. Remove , ExecutionEngine.BEAM from public TGoogleFusionTableInputDefinition() { super(COMPONENT_NAME, ExecutionEngine.DI, ExecutionEngine.BEAM); }, because in this tutorial, the tGoogleFustionTableInput is developed to run in a DI Job only.
    3. Declare public String[] getFamilies() { return new String[] { "Cloud/Fusion Tables" }; //$NON-NLS-1$ } to replace the default one: public String[] getFamilies() { return new String[] { "File/Input" }; //$NON-NLS-1$ }. This defines the component family that will be displayed in the Studio.
    4. Leave the other code as is.

Defining component properties

Specify the properties your component needs to use to perform actions and define how these properties are displayed in a Talend product such as the Studio.
The analysis of the Google Fusion Tables API suggests that the following properties need to be defined:
  1. The connection properties:
    • Client id and Client secret.

      Callback host and callback port will be handled by Google client library. It uses localhost as callback host and random free port. Client library opens system browser to get user's consent and starts server to receive authorization code.

  2. The table properties:
    • Table id

    • Table schema

    • Query

Defining the connection properties

The component uses these properties to connect to Google Fusion Tables.

Procedure

  1. In the components-google-definition folder, create class org.talend.components.google.connection.GoogleFusionTableConnectionProperties.
    
        import org.talend.daikon.properties.PropertiesImpl;
        public class GoogleFusionTableConnectionProperties extends PropertiesImpl {
    
    }
    
  2. Fill this class with the above-mentioned connection properties: Client id and Client secret.
    1. Import the dependencies.
      import java.util.EnumSet;
      import org.talend.daikon.properties.PropertiesImpl;
      import org.talend.daikon.properties.presentation.Form;
      import org.talend.daikon.properties.property.Property;
      import org.talend.daikon.properties.property.PropertyFactory;
    2. Define the required properties.
      private static final long serialVersionUID = 5928871310221499828L;
      /**
        * Constructor sets properties name
        * 
        * @param name name of this properties instance
        */
        public GoogleFusionTableConnectionProperties(String name) {
        super(name);
        }
              
        public final Property<String> clientId = PropertyFactory.newString("clientId");
              
        public final Property<String> clientSecret = PropertyFactory.newString("clientSecret")
        .setFlags(EnumSet.of(Property.Flags.ENCRYPT, Property.Flags.SUPPRESS_LOGGING));
              
        } 
      In the code:
      • The Property class and the PropertyFactory class are used to define properties.

      • Fields of the Property class must be public but not static.

      • Fields of the Property class and the names of the properties they define must be the same.

      • Each fields should be parameterized by a given type of stored value. For example, Property<String> is used for string values such as user name or path and Property<Integer> is used for numbers like timeout.

    3. In resources, create a file connection/GoogleFusionTableConnectionProperties.properties to define the names to be displayed of the three connection properties.
      property.clientId.displayName=Client Id
      property.clientSecret.displayName=Client Secret
  3. In the same class, use the setupProperties() method to define the default values of these properties. The first line in the implementation of this method must be super.setupProperties();.
    @Override
    public void setupProperties() {
     super.setupProperties();
     clientId.setValue("");
     clientSecret.setValue("");
    }

    If you implement this method without specifying default values, default implementation from super class will be used.

  4. In the same class, use the setupLayout() method to define the layout of these properties.
    1. Import the dependency.
      import org.talend.daikon.properties.presentation.Form;
    2. Implement setupLayout() starting with the line: super.setupLayout();.
      @Override
      public void setupLayout() {
       super.setupLayout();
       Form mainForm = new Form(this, Form.MAIN);
       mainForm.addRow(clientId);
       mainForm.addRow(clientSecret);
      }
      The Form class and the Widget class are used to construct a view in the user interface.
      • A Widget instance (a widget) defines the UI of a specific property and a Form instances (a form) contains a set of widgets or other forms.

      • Three major forms are:

        MAIN

        Contructs the Basic settings view in the Studio.

        ADVANCED

        Contructs the Advanced settings view in the Studio.

        REFERENCE

        Contructs the view specific to a property.

      • The Form class uses two methods to place properties and widgets:

        addRow()

        Places a widget in a new row under its previous widget.

        addColumn()

        Places a widget in the same row but to the right of its previous widget..

    3. Open GoogleFusionTableConnectionProperties.properties to define the names to be displayed of the created views.
      form.Main.title=Main
      form.Main.displayName=Main

Results

The GoogleFusionTableConnectionProperties class is defined.

A refreshLayout() method is also available to define how the layout changes depending on users' input, but it is not needed in this example. For further information, see User interaction callbacks.

Defining the table properties

Define the properties to be used to read data from Google Fusion Tables.

Procedure

  1. In the components-google-definition folder, create class org.talend.components.google.table.GoogleFusionTableProperties.
    import org.talend.daikon.properties.PropertiesImpl;
    public class GoogleFusionTableProperties extends PropertiesImpl {
      /**
      * Constructor sets properties name
      * 
      * @param name name of this properties instance
      */
       public GoogleFusionTableProperties(String name) {
         super(name);
         }
    }                 
  2. Fill this class with the above-mentioned table properties: Table id, Table schema and Query.
    1. Import the dependencies.
      import org.talend.components.common.SchemaProperties;
      import org.talend.daikon.properties.property.Property;
      import org.talend.daikon.properties.property.PropertyFactory;
    2. Define the required properties.
       private static final long serialVersionUID = -4353914708920008905L;
       
       public final Property<String> tableId = PropertyFactory.newString("tableId");
       
       public final SchemaProperties tableSchema = new SchemaProperties("tableSchema");
          
       public final Property<String> query = PropertyFactory.newString("query");
                              
      In the code:
      • The Table schema property is a schema specific property. It describes all fields and their types of the data retrieved from Google Fusion Tables. We must use the SchemaProperties class, a common class in Talend Component Kit framework, to define the schema property.

      • The Query property is a read-only property to show the SQL query generated from Table schema to be used in REST calls. A read-only flag needs to be set in the implementation of the setupLayout() method.

      • You could notice that the tableSchema property is declared as a property of the SchemaProperties class while the tableId and the query properties are declared as fields of the Property class. For further information about Properties and Property, see Component model.

    3. In resources, create a file table/GoogleFusionTableProperties.properties to define the names to be displayed of the three table properties.
      property.tableId.displayName=Table Id
      property.query.displayName=SQL Query
  3. In the same class, use the setupProperties() method to define the default values of these table properties.
    @Override
    public void setupProperties() {
      super.setupProperties();
      tableId.setValue("");
      query.setValue("");
    }

    You do not need to call tableSchema.SchemaProperties() to define the Table schema property in the current method. It is automatically invoked by Component Kit framework.

  4. Implement the setupLayout() method to define the layout of these table properties.
    1. Import the dependency.
      import org.talend.daikon.properties.presentation.Form;
      import org.talend.daikon.properties.presentation.Widget;
    2. Implement setupLayout().
      @Override
      public void setupLayout() {
        super.setupLayout();
        Form mainForm = new Form(this, Form.MAIN);
        mainForm.addRow(tableId);
        mainForm.addRow(tableSchema.getForm(Form.REFERENCE));
        mainForm.addRow(Widget.widget(query).setReadonly(true));
      }
      • All the properties are placed in the MAIN form, that is to say, in the Basic settings view in the Studio.

      • The Table schema property has a specific view in the REFERENCE form and this view is opened from the MAIN form.

    3. Open GoogleFusionTableTableProperties.properties to define the names to be displayed of the created views.
      form.Main.title=Main
      form.Main.displayName=Main
  5. Implement interaction triggers for the table properties. These triggers enable the component to return given results when users change the properties.

    By implementing the code in the following steps, you could notice that an after[PropertyName]() method is used to trigger the generation of SQL query. But this method should be implemented differently for the tableSchema field of the SchemaProperties class and for the Property field tableId. In addition, the first letter after the keyword "after" must be upper-case.

    1. Update public final SchemaProperties tableSchema = new SchemaProperties("tableSchema"); to enable SQL query generation after the table schema is changed.
      public final SchemaProperties tableSchema = new SchemaProperties("tableSchema") {        
       /**
       * Computes and sets SQL query after Table Schema is changed
       */
          public void afterSchema() {
            query.setValue(buildQuery());
            refreshLayout(getForm(Form.MAIN));
          }        
      };

      For tableSchema, an inner class is declared based on SchemaProperties and the after[PropertyName]() trigger, afterSchema() in this example, is added to this inner class.

      This is because the after[PropertyName]() method cannot be directly added to a field of the Properties class such as this SchemaProperties class but can only be used either in the Properties class itself or in an inner class as explained above.

    2. Add the triggers to build the SQL query based on Table id and Table schema.
      public void afterTableId() {
       query.setValue(buildQuery());
       refreshLayout(getForm(Form.MAIN));
      }
           
      /**
      * Builds SQL query to retrieve rows from fusion table
      * If Table Id or Table Schema wasn't set, it returns empty string
      * Builds SQL query according following patern: "SELECT [fields] FROM [tableId]"
      * 
      * @return SQL query
      */
      protected String buildQuery() {
        if (!isTableIdSet() || !isTableSchemaSet()) {
            return "";
        }
               
        StringBuilder selectQuery = new StringBuilder();
        selectQuery.append("SELECT ");
        selectQuery.append(schemaFields());
        selectQuery.append(" FROM ");
        selectQuery.append(tableId.getValue());
        return selectQuery.toString();
      }
      /**
      * Checks whether Table Id property is set
      * 
      * @return true if it is set, false otherwise
      */
      private boolean isTableIdSet() {
        String tableIdValue = tableId.getValue();
        return tableIdValue != null && !tableIdValue.trim().isEmpty();
      }
           
      /**
      * Checks whether Table Schema property is set and not empty
      * 
      * @return true if it is set and not empty, false otherwise
      */
      private boolean isTableSchemaSet() {
         Schema tableSchemaValue = tableSchema.schema.getValue();
         return tableSchemaValue != null && !AvroUtils.isSchemaEmpty(tableSchemaValue);
      }
       
      /**
      * @return a string of table schema field names separated by comma
      */
      private String schemaFields() {
        List<Field> fields = tableSchema.schema.getValue().getFields();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < fields.size() - 1; i++) {
          sb.append(fields.get(i).name());
          sb.append(',');
         }
         sb.append(fields.get(fields.size()-1).name());
         return sb.toString();
      }

      The after[PropertyName]() method, afterTableId in this example is directly added for the tableId field.

      The before[PropertyName] and the validate[PropertyName] methods are also available to trigger changes, but they are not needed in this class.

Binding the properties

Define the ComponentProperties class to bind the connection and the table properties together.

This class should provide a Connector class. The component uses this Connector to get data schema and eventually passes it to other components in Job process.

Procedure

  1. In the components-google-definition folder, open class org.talend.components.google.tgooglefusiontableinput.TGoogleFusionTableInputProperties.
    Remove the default code and add
    import java.util.Set;
    import org.talend.components.api.component.PropertyPathConnector;
    import org.talend.components.common.FixedConnectorsComponentProperties;
    
    public class TGoogleFusionTableInputProperties extends FixedConnectorsComponentProperties {
        /**
        * Constructor sets properties name
         * 
         * @param name name of this properties instance
           */
          public TGoogleFusionTableInputProperties(String name) {
              super(name);
          } 
    
          @Override
     
          protected Set<PropertyPathConnector> getAllSchemaPropertiesConnectors(boolean isOutputConnection) {
              return null;
          } 
    }         
  2. Fill this class with GoogleFusionTableConnectionProperties and GoogleFusionTableProperties as fields.
    1. Import the dependencies.
      import org.talend.components.google.connection​.GoogleFusionTableConnectionProperties;
      import org.talend.components.google.table.GoogleFusionTableProperties;
    2. Add the connection properties and the table properties.
      private static final long serialVersionUID = -1908527018782175776L;
      public final GoogleFusionTableConnectionProperties connectionProperties = new GoogleFusionTableConnectionProperties("connectionProperties");
          
      public final GoogleFusionTableProperties tableProperties = new GoogleFusionTableProperties("tableProperties");

      This way, the default values of the properties and the triggers defined in GoogleFusionTableConnectionProperties and GoogleFusionTableProperties are all applied in this class.

  3. Implement the setupLayout() method for TGoogleFusionTableInputProperties class.
    After adding import org.talend.daikon.properties.presentation.Form;, add the code below:
    @Override
    public void setupLayout() {
      super.setupLayout();
            
      Form mainForm = new Form(this, Form.MAIN);
      mainForm.addRow(connectionProperties.getForm(Form.MAIN));
      mainForm.addRow(tableProperties.getForm(Form.MAIN));
    }
    • The MAIN forms from the connection and the table properties are added to the MAIN form of TGoogleFusionTableInputProperties.

  4. From resources, open googlefusiontableinput/TGoogleFusionTableInputProperties.properties to define the names to be displayed of the forms.
    form.Main.title=Main
    form.Main.displayName=Main
  5. Implement the Connector class to transfer the schema from tGoogleFusionTableInput to the other components in a Job. In a Talend graphic Job, this class defines the link between tGoogleFusionTableInput and the other components.
    1. Import dependencies
      import java.util.Collections;
      import org.talend.components.api.component.Connector;
    2. Add the field that defines the type of the connector being declared and specifies the path to SchemaProperties.
      private final transient PropertyPathConnector mainOutgoingConnector = new PropertyPathConnector(Connector.MAIN_NAME, "tableProperties.tableSchema");
    3. Implement the getAllSchemaPropertiesConnectors() method to return outgoing connectors from tGoogleFusionTableInput to the other components.
      /**
      * Returns set of incoming or outgoing connectors
      * 
      * @param isOutputConnectors specifies either incoming or outgoing connectors are requested
      * @return set of incoming or outgoing connectors
      */
      @Override
      protected Set<PropertyPathConnector> getAllSchemaPropertiesConnectors(boolean isOutputConnectors) {
        if (isOutputConnectors) {
          return Collections.singleton(mainOutgoingConnector);
         } else {
           return Collections.emptySet();
         }

Developing the component runtime

Develop the component runtime to enable this component to connect to the data source, flexibly convert and transfer data in a Talend Job.
For the example component, tGoogleFustionTableInput, the following classes should be implemented:
  • Source (extends SourceOrSink): represents connection to external data source and is responsible for Reader class creation.

  • Reader: reads data from data source and converts it to Talend data formats.

Defining the Source class

It implements either the BoundedSource interface for finite amount of data or the UnboundedSource interface for infinite amount of data.

As a Google Fusion Table has finite amount of data, the BoundedSource interface should be implemented.

Procedure

  1. In the components-google-runtime folder, created class org.talend.components.google.runtime.reader.TGoogleFusionTableInputSource.
    import java.io.IOException;
    import java.security.GeneralSecurityException;
    import java.util.List;
     
    import org.apache.avro.Schema;
    import org.talend.components.api.component.runtime.BoundedSource;
    import org.talend.components.api.container.RuntimeContainer;
    import org.talend.components.api.properties.ComponentProperties;
    import org.talend.daikon.NamedThing;
    import org.talend.daikon.properties.ValidationResult;
    
     
    public class TGoogleFusionTableInputSource implements BoundedSource {
     
     @Override
     public ValidationResult initialize(RuntimeContainer container, ComponentProperties properties) {
       return null;
     }
         
     @Override
     public ValidationResult validate(RuntimeContainer container) {
       return null;
     }
         
     @Override
     public BoundedReader createReader(RuntimeContainer adaptor) {
       return null;
     }
          
     @Override
     public List<NamedThing> getSchemaNames(RuntimeContainer container) throws IOException {
         return null;
     }
     
     @Override
     public Schema getEndpointSchema(RuntimeContainer container, String schemaName) throws IOException {
       return null;
     }
     
     @Override
     public List<? extends BoundedSource> splitIntoBundles(long desiredBundleSizeBytes, RuntimeContainer adaptor)
       throws Exception {
       return null;
     }
     
     @Override
     public long getEstimatedSizeBytes(RuntimeContainer adaptor) {
       return 0;
     }
     
     @Override
     public boolean producesSortedKeys(RuntimeContainer adaptor) {
       return false;
     }
    }

    Till now, this code contains only templates.

  2. Implement the initialize() method for TGoogleFusionTableInputSource.
    1. Import dependencies:
      import org.talend.components.google.tgooglefusiontableinput​.TGoogleFusionTableInputProperties;
      import org.talend.daikon.properties.ValidationResult.Result;
    2. Implement initialize() to store and check the ComponentProperties class by updating the corresponding template. In this example, the ComponentProperties class is the previously defined TGoogleFusionTableInputProperties class.
      private TGoogleFusionTableInputProperties properties;
       
      @Override
      public ValidationResult initialize(RuntimeContainer container, ComponentProperties properties) {
        assert properties instanceof TGoogleFusionTableInputProperties;
        this.properties = (TGoogleFusionTableInputProperties) properties;
        if (isCredentialsSet()) {
            return ValidationResult.OK;
        } else {
            return new ValidationResult(Result.ERROR, "Client Id or/and Client Secret wasn't set");
        }
      }
      /**
      * Checks whether credentials were set. If they are empty, method returns false
      * 
      * @return true when credentials are set, false - otherwise
      */
      private boolean isCredentialsSet() {
        String clientId = properties.connectionProperties.clientId.getValue();
        if (clientId == null || clientId.trim().isEmpty()) {
           return false;
        }
        String clientSecret = properties.connectionProperties.clientSecret.getValue();
        if (clientSecret == null || clientSecret.trim().isEmpty()) {
            return false;
        }
        return true;
      }

      This method verifies that the Google credentials (the client id and the client secret) have been set.

  3. Implement the validate() method for TGoogleFusionTableInputSource.
    1. Open org.talend.components.google.tgooglefusiontableinput.TGoogleFusionTableInputProperties and add methods that get credentials related values.
       /**
       * Returns Client Id value
       * 
       * @return Client Id value
       */
       public String getClientId() {
         return connectionProperties.clientId.getValue();
       }
           
       /**
       * Returns Client Secret value
       * 
       * @return Client Secret value
       */
       public String getClientSecret() {
         return connectionProperties.clientSecret.getValue();
       }
    2. Open components-google-runtime/pom.xml to add the Google authentication related artifacts.
      <dependency>
        <groupId>com.google.http-client</groupId>
        <artifactId>google-http-client-jackson2</artifactId>
        <version>1.22.0</version>
       </dependency>
       <dependency>
         <groupId>com.google.oauth-client</groupId>
         <artifactId>google-oauth-client-jetty</artifactId>
         <version>1.22.0</version>
       </dependency>
    3. Create a separate class org.talend.components.google.runtime.reader.FusionTablesCreator to establish connection to Google Fusion Table.
      package org.talend.components.google.runtime.reader;
       
      import java.io.File;
      import java.io.IOException;
      import java.security.GeneralSecurityException;
      import java.util.Collections;
       
      import com.google.api.client.auth.oauth2.Credential;
      import com.google.api.client.extensions.java6.auth.oauth2​.AuthorizationCodeInstalledApp;
      import com.google.api.client.extensions.jetty.auth.oauth2​.LocalServerReceiver;
      import com.google.api.client.googleapis.auth.oauth2​.GoogleAuthorizationCodeFlow;
      import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
      import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
      import com.google.api.client.http.HttpTransport;
      import com.google.api.client.json.JsonFactory;
      import com.google.api.client.json.jackson2.JacksonFactory;
      import com.google.api.client.util.store.DataStoreFactory;
      import com.google.api.client.util.store.MemoryDataStoreFactory;
      import com.google.api.services.fusiontables.Fusiontables;
      import com.google.api.services.fusiontables.FusiontablesScopes;
       
      /**
      * Creates instance of {@link Fusiontables}
      */
      class FusionTablesCreator {
       
      /**
      * Application name required by Google Fusion Tables API.
      * It doesn't affect on connection
      */
      private static final String APP_NAME = "GoogleFusionTablesComponent";
       
      private final String clientId;
       
      private final String clientSecret;
       
      /**
      * Constructor sets required credentials and storage path
      * 
      * @param clientId
      * @param clientSecret
      */
      FusionTablesCreator(String clientId, String clientSecret) {
        this.clientId = clientId;
        this.clientSecret = clientSecret;
      }
       
      /**
      * Creates {@link Fusiontables} instance
      * 
      * @return {@link Fusiontables} instance
      * @throws GeneralSecurityException
      * @throws IOException
      */
      Fusiontables createFusionTables() throws GeneralSecurityException, IOException {
        JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
        HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
        DataStoreFactory dataStoreFactory = createDataStoreFactory();
        Credential credential = authorize(httpTransport, jsonFactory, dataStoreFactory);
        Fusiontables fusionTables = new Fusiontables.Builder(httpTransport, jsonFactory, credential).setApplicationName(APP_NAME)
           .build();
        return fusionTables;
      }
       
      /**
      * Creates storage for received access and refresh tokens. Next time connection won't require user consent.
      * It will reuse tokens stored in the storage
      * 
      * @return
      * @throws IOException
      */
      private DataStoreFactory createDataStoreFactory() throws IOException {
         return MemoryDataStoreFactory.getDefaultInstance();
      }
       
      /**
      * Creates flow to get authorization code and tokens:
      * Starts server to get authorization code from Google server
      * Tries to open system browser for user's consent
      * 
      * @param httpTransport
      * @param jsonFactory
      * @param dataStoreFactory
      * @return
      * @throws IOException
      */
      private Credential authorize(HttpTransport httpTransport, JsonFactory jsonFactory, DataStoreFactory dataStoreFactory)
        throws IOException {
           GoogleClientSecrets clientSecrets = createClientSecret();
           GoogleAuthorizationCodeFlow flow = new GoogleAuthorizationCodeFlow.Builder(httpTransport, jsonFactory, clientSecrets,
               Collections.singleton(FusiontablesScopes.FUSIONTABLES)).setDataStoreFactory(dataStoreFactory).build();
             return new AuthorizationCodeInstalledApp(flow, new LocalServerReceiver()).authorize(clientId);
        }
       
      /**
      * Creates client secret instance from client id and client secret
      * 
      * @return GoogleClientSecrets
      */
      private GoogleClientSecrets createClientSecret() {
        GoogleClientSecrets.Details details = new GoogleClientSecrets.Details();
        details.setClientId(clientId);
        details.setClientSecret(clientSecret);
        GoogleClientSecrets clientSecrets = new GoogleClientSecrets();
        clientSecrets.setInstalled(details);
        return clientSecrets;
        }
       
      }
    4. In TGoogleFusionTableInputSource, after adding import com.google.api.services.fusiontables.Fusiontables;, implement validate() to check the connection to Google Fusion Tables by updating the corresponding template.
      private transient Fusiontables fusionTables;
      
      @Override
      public ValidationResult validate(RuntimeContainer container) {
          try {
               getConnection();
          } catch (IOException | GeneralSecurityException e) {
               return new ValidationResult(Result.ERROR, "Error during connection establishment");
          }
               return ValidationResult.OK;
      }
      
      * Returns Fusion Tables connection. Creates it, if it doesn't exist yet
      * 
      * @return Fusion Tables connection
      * @throws GeneralSecurityException
      * @throws IOException
      */
      Fusiontables getConnection() throws GeneralSecurityException, IOException {
         if (fusionTables == null) {
            fusionTables = new FusionTablesCreator(properties.getClientId(), properties.getClientSecret(),
            properties.getStoragePath()).createFusionTables();
          }
            return fusionTables;
      }
  4. Deactivate the schema discovery features by updating the corresponding template.
    @Override
    public List<NamedThing> getSchemaNames(RuntimeContainer container) throws IOException {
      throw new UnsupportedOperationException();
    }
      
    @Override
    public Schema getEndpointSchema(RuntimeContainer container, String schemaName) throws IOException {
      throw new UnsupportedOperationException();
    }
  5. After adding import java.util.Arrays;, set up the default implementation for the methods specific for running in distributed environments: splitIntoBundles(), getEstimatedSizeBytes() and producesSortedKeys().
    @Override
    public List<? extends BoundedSource> splitIntoBundles(long desiredBundleSizeBytes, RuntimeContainer adaptor)
       throws Exception {
       // This source won't be splitted
       return Arrays.asList(this);
       }
      
    @Override
    public long getEstimatedSizeBytes(RuntimeContainer adaptor) {
    // This will be ignored since the source will never be split.
      return 0;
    }

    This tutorial does not cover running this component in a distributed environment. If you are interested in these method, see Distributed environments.

Implementing Avro data converter

The Talend Component Kit uses the IndexedRecord class from Apache Avro library to describe the data to be processed.

This means that tGoogleFustionTableInput needs to convert the data from Google Fustion Tables to Avro format so that the other components can handle it.

Procedure

  1. Analyze the source data.

    Google Fusion Tables supports 4 data types: Text (String), Number, Location, Data/Time, but its API library returns values as String (almost for all data) and BigDecimal (for floating-point numbers).

    Therefore, the converters to be used need to rely on the data schema to understand the real logical types of the data, such as, "123" is Java Integer type and "2015-04-05" is Timestamp.

  2. Create class org.talend.components.google.avro.RowConverter to handle the incoming data of the String type. In this type of data, every record is List<Object> and RowConverter converts each whole record to IndexedRecord.

    This type of converter that converts an entire object is called 1st level converter in Talend component Kit and very often, only this level of converter is needed.

    package org.talend.components.google.avro;
     
     import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
     
     import org.apache.avro.Schema;
     import org.apache.avro.Schema.Field;
     import org.apache.avro.Schema.Type;
     import org.apache.avro.generic.GenericData;
     import org.apache.avro.generic.IndexedRecord;
     import org.talend.daikon.avro.AvroUtils;
     import org.talend.daikon.avro.LogicalTypeUtils;
     import org.talend.daikon.avro.SchemaConstants;
     import org.talend.daikon.avro.converter.AbstractAvroConverter;
     import org.talend.daikon.avro.converter.AvroConverter;
     import org.talend.daikon.avro.converter.string.StringBooleanConverter;
     import org.talend.daikon.avro.converter.string.StringConverter;
     import org.talend.daikon.avro.converter.string.StringIntConverter;
     import org.talend.daikon.avro.converter.string.StringLongConverter;
     import org.talend.daikon.avro.converter.string.StringStringConverter;
     import org.talend.daikon.avro.converter.string.StringTimestampConverter;
     
     /**
      * Converts data row as List<Object> to {@link IndexedRecord} using schema to guess value type
      */
     @SuppressWarnings("rawtypes")
     public class RowConverter extends AbstractAvroConverter<List, IndexedRecord> {
     
         /**
          * Contains available {@link StringConverter}. Avro type is used as a key
          * However datum class could be also used as key. It depends on what data
          * mapping is required for particular component family. There might be
          * situations when several datum classes are mapped to the same avro type.
          * This is the case to use datum class as a key
          */
         private static final Map<Type, AvroConverter> converterRegistry;
     
         /**
          * Stores converters. Array index corresponds to field index
          */
         private AvroConverter[] converters;
     
         /**
          * Fill in converter registry
          */
         static {
             converterRegistry = new HashMap<>();
             converterRegistry.put(Type.BOOLEAN, new StringBooleanConverter());
             converterRegistry.put(Type.DOUBLE, new BigDecimalDoubleConverter());
             converterRegistry.put(Type.INT, new StringIntConverter());
             converterRegistry.put(Type.LONG, new StringLongConverter());
             converterRegistry.put(Type.STRING, new StringStringConverter());
         }
     
         /**
          * Constructor sets outgoing record schema and {@link List} class as datum class
          * 
          * @param clazz
          * @param schema
          */
         public RowConverter(Schema schema) {
             super(List.class, schema);
             initConverters(schema);
         }
     
         /**
          * Initialize converters per each schema field
          * 
          * @param schema
          * design schema
          */
         private void initConverters(Schema schema) {
             converters = new AvroConverter[schema.getFields().size()];
             List<Field> fields = schema.getFields();
             for (int i = 0; i < schema.getFields().size(); i++) {
                 Field field = fields.get(i);
                 Schema fieldSchema = field.schema();
                 fieldSchema = AvroUtils.unwrapIfNullable(fieldSchema);
                 if (LogicalTypeUtils.isLogicalTimestampMillis(fieldSchema)) {
                     String datePattern = field.getProp(SchemaConstants.TALEND_COLUMN_PATTERN);
                     converters[i] = new StringTimestampConverter(datePattern);
                 } else {
                     Type type = fieldSchema.getType();
                     converters[i] = converterRegistry.get(type);
                 }
             }
         }
     
         /**
          * @throws UnsupportedOperationException as this method is not supported yet
          */
         @Override
         public List<Object> convertToDatum(IndexedRecord value) {
             throw new UnsupportedOperationException();
         }
     
         @SuppressWarnings("unchecked")
         @Override
         public IndexedRecord convertToAvro(List row) {
             IndexedRecord record = new GenericData.Record(getSchema());
             for (int i = 0; i < row.size(); i++) {
                 Object value = converters[i].convertToAvro(row.get(i));
                 record.put(i, value);
             }
             return record;
         }
     
     }
    • This converter leverages many existing String converters in the Component Kit. You can find them in the org.talend.daikon.avro.converter.string package in the Daikon dependency.

    • Four methods are often used to implement a converter:

      getSchema()

      Returns the schema to be produced (for 1st level converters) or type of field (for 2nd level converters) as described by of IndexedRecord. Generally, it represents a type of converted Avro object.

      It has an general implementation in the AbstractAvroConverter class.

      getDatumClass()

      Returns the classes of the source data. Generally, it represents the types of the source data.

      It has an general implementation in the AbstractAvroConverter class.

      convertToDatum()

      Converts data from Avro types to other types. tGoogleFustionTableInput is a source component to read and convert data to avro and so this method is not covered in this tutorial.

      convertToAvro()

      Converts data from other types to Avro types.

  3. Create class org.talend.components.google.avro.BigDecimalDoubleConverter to handle the incoming data of the BigDecimal type.

    This converter is a 2nd level converter that converts only a field of each BigDecimal record to Avro Double type.

    package org.talend.components.google.avro;
     
     import java.math.BigDecimal;
     
     import org.talend.daikon.avro.AvroUtils;
     import org.talend.daikon.avro.converter.AbstractAvroConverter;
     
     /**
      * Converts {@link BigDecimal} data to Avro compatible {@link Double} type
      */
     public class BigDecimalDoubleConverter extends AbstractAvroConverter<BigDecimal, Double> {
     
     /**
     * Sets Double avro schema and {@link BigDecimal} type as datum class
     * 
     * @param clazz
     * @param schema
     */
     public BigDecimalDoubleConverter() {
       super(BigDecimal.class, AvroUtils._double());
     }
     
     @Override
     public BigDecimal convertToDatum(Double value) {
       throw new UnsupportedOperationException();
     }
     
     @Override
     public Double convertToAvro(BigDecimal value) {
       return value.doubleValue();
     }
    }

Defining the Reader class

It implements either the BoundedReader interface for finite amount of data or the UnboundedReader interface for infinite amount of data and converts data to Avro format.

As a Google Fusion Table has finite amount of data, the BoundedReader interface should be implemented.

Procedure

  1. In the components-google-runtime folder, created class org.talend.components.google.runtime.reader.TGoogleFusionTableInputReader.
    import java.io.IOException;
    import java.util.Map;
    import java.util.NoSuchElementException;
     
    import org.apache.avro.generic.IndexedRecord;
    import org.talend.components.api.component.runtime.AbstractBoundedReader;
    import org.talend.components.api.component.runtime.BoundedReader;
    import org.talend.components.api.component.runtime.BoundedSource;
     
    /**
    * Reader for Google Fusion Tables. Reads rows of specified table
    */
    public class TGoogleFusionTableInputReader extends AbstractBoundedReader<IndexedRecord> implements BoundedReader<IndexedRecord> {
     
    /**
    * Constructor sets {@link Source} of this {@link Reader}
    * 
    * @param source {@link Source} of this {@link Reader}
    */
    protected TGoogleFusionTableInputReader(BoundedSource source) {
      super(source);
      }
     
    @Override
    public boolean start() throws IOException {
      return false;
      }
     
    @Override
    public boolean advance() throws IOException {
      return false;
      }
     
    @Override
      public IndexedRecord getCurrent() throws NoSuchElementException {
        return null;
      }
     
    @Override
    public Map<String, Object> getReturnValues() {
       return null;
       }
    }

    Till now, this code contains only templates.

  2. Implement the getCurrentSource() method to dynamically get instances of the Source class, TGoogleFusionTableInputSource in this example.
    1. Update protected TGoogleFusionTableInputReader(BoundedSource source) { super(source); } to protected TGoogleFusionTableInputReader(TGoogleFusionTableInputSource source) { super(source); } to cast BoundedSource type to TGoogleFusionTableInputSource.

      As the AbstractBoundedReader.getCurrentSource() method provides only the BoundedSource type of instances, by making this cast here, you avoid having to do so each time after you call getCurrentSource().

    2. implement the getCurrentSource() method.
      @Override
      public TGoogleFusionTableInputSource getCurrentSource() {
        return (TGoogleFusionTableInputSource) super.getCurrentSource();
      }
  3. Implement the start() method for TGoogleFusionTableInputReader to create connection to Google Fusion Tables, read data and point the Reader class to the first data record.
    1. Open TGoogleFusionTableInputSource and add getQuery() method to it.
      String getQuery() {
        return properties.tableProperties.query.getValue();
      }
    2. Import new dependencies to TGoogleFusionTableInputReader.
      import java.security.GeneralSecurityException;
      import java.util.Iterator;
      import java.util.List;
        
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.talend.components.api.component.runtime.Reader;
      import org.talend.components.api.component.runtime.Source;
       
      import com.google.api.services.fusiontables.Fusiontables;
      import com.google.api.services.fusiontables.Fusiontables.Query.Sql;
      import com.google.api.services.fusiontables.model.Sqlresponse;
    3. In TGoogleFusionTableInputReader, implement start() by updating the corresponding template.
      private static final Logger LOGGER = LoggerFactory.getLogger(TGoogleFusionTableInputReader.class);
       
      private Iterator<List<Object>> rows;
       
      /**
      * Represents state of this Reader: whether it was started or not
      */
      private boolean started = false;
      
      
      @Override
      public boolean start() throws IOException {
        try {
          Fusiontables fusionTables = getCurrentSource().getConnection();
          String query = getCurrentSource().getQuery();
          Sql sql = fusionTables.query().sql(query);
          LOGGER.debug("execute query: " + query);
          Sqlresponse response = sql.execute();
          rows = response.getRows().iterator();
          started = true;
          return advance();
         } catch (GeneralSecurityException e) {
          throw new IOException(e);
         }
      }
      • The method returns true if there is data available in the source system; otherwise, it returns false.

      • The method throws IOException in case of errors when connecting to the source or reading data.

      • If Google Fusion Tables API returns multiple data rows at once, these rows are cached in the Reader class for later referencing in the advanced() method.

      You can read in the code that the method gets the connection instance, executes an SQL query to read multiple rows, caches these rows and set the pointer to 0 position. The advance() method is called to transform first data row to IndexedRecord and to advance data pointer to the first data record.

  4. Implement the advance() method to use data converters.
    1. Open the TGoogleFusionTableInputSource class to add the getDesignSchema() method.
      Schema getDesignSchema() {
        return properties.tableProperties.tableSchema.schema.getValue();
        }
    2. In the TGoogleFusionTableInputReader class, import new dependencies.
      import org.talend.components.google.avro.RowConverter;
    3. Implement advance().
      private IndexedRecord currentRecord;
      /**
      * Converts row retrieved from data source to Avro format {@link IndexedRecord}     */
      private RowConverter converter;
      
      * Represents state of this Reader: whether it has more records
      */
      private boolean hasMore = false;
      
      @Override
      public boolean advance() throws IOException {
        if (!started) {
           throw new IllegalStateException("Reader wasn't started");
        }
        hasMore = rows.hasNext();
        if (hasMore) {
           List<Object> row = rows.next();
           currentRecord = getConverter().convertToAvro(row);
        }
        return hasMore;
      }
      
      /**
      * Returns implementation of {@link AvroConverter}, creates it if it doesn't
      * exist.
      * 
      * @return converter
      */
      private RowConverter getConverter() {
        if (converter == null) {
           converter = new RowConverter(getCurrentSource().getDesignSchema());
        }
           return converter;
      }
      • A started flag is declared to check whether Reader has been started and throw IllegalArgumentException if it has not been.

      • The advance() method checks whether there are still data rows. It returns true if there are still rows to come or false if there are no more.

      • If there are still data rows, this method takes it, converts it using RowConverter to IndexedRecord and caches IndexdRecord.

      • The getConverter() method is added to create the converter only once (for first row of retrieved data) and then reuse it.

  5. Implement the getCurrent() method to return the current IndexedRecord.
    @Override
    public IndexedRecord getCurrent() throws NoSuchElementException {
      if (!started) {
        throw new NoSuchElementException("Reader wasn't started");
      }
      if (!hasMore) {
        throw new NoSuchElementException("Has no more elements");
      }
        return currentRecord;
    }

    As multiple subsequent calls to this method (without calling advance() ) should return the same format, it is recommended to cache IndexedRecord.

  6. Set the Reader class to the closed state.
    @Override
    public void close() {
       started = false;
       currentRecord = null;
       converter = null;
       rows = null;
    }
  7. Now that the TGoogleFusionTableInputReader class has been properly implemented, add createReader() for TGoogleFusionTableInputSource.
    @Override
     
    public TGoogleFusionTableInputReader createReader(RuntimeContainer adaptor) {
      return new TGoogleFusionTableInputReader(this);
    }

    This method calls the Reader class constructor to pass the reference on the method itself to it.

Building the component in the Studio

Build your component, test it and create documentation for it.

If you are using a commercial edition of the Studio, you can define an automatic installation mechanism of your component. For further information, see Installing your custom component automatically (Deprecated).

Procedure

  1. Test your component. In both the components-google-definition and the components-google-runtime folders, test folders have been automatically created for you to write your Java test scripts for each class in it.
  2. From the root folder of the project, my_components/components-google/ in this example, execute mvn clean install to build the component.
  3. Once the build is done, keep your Studio closed and copy-paste the component bundle (the *-bundle.jar file) of the definition part, components-google-definition-0.1.0-SNAPSHOT-bundle.jar in this example, into studio_installation/plugins.
  4. Open the studio_installation/configuration/config.ini file.
  5. Find the osgi.bundles property and append the file name of your component bundle by following this syntax:

    For example, if the name of your bundle file is components-google-definition-0.1.0-SNAPSHOT-bundle.jar, append ,components-google-definition-0.1.0-SNAPSHOT-bundle.jar@4

    The version of the bundle is defined in the pom.xml file in the component-google-defition folder. You can adapt it from that file to manage the versions of your component. But the version of the components-parent artifact must be the same as the version you used in the Archetype to create your component, that is to say, the component API version of the Studio to be used.

  6. In the configuration folder, remove any folders which start their names with org.eclipse.
  7. Copy the components-google-runtime folder from your local .m2/repository/org/talend/components/ to studio_installation/configuration/.m2/repository/org/talend/components.
    Tip: you can alternatively add maven.repository=global to the studio_installation/configuration/config.ini file to make the Studio use your local .m2 repository.
  8. Start the Studio and create a Job. tGoogleFustionTableInput is now listed in the Palette under the Cloud/Fusion Tables family.
  9. Test it by running it in a Job.
  10. Create documentation for this brand new component. For an example of Talend components documentation, see tSnowflakeInput.