Data integration is an urgent challenge for today’s businesses, driven by sharp growth in the volume and diversity of data generated by daily operations and by the need to make the best possible use of that data to advance the prospects of the business.
Clean, timely, and correct data is a business-critical resource for most organizations because it enables more accurate decision making, compliance, and improved efficiency.
Data integration is often the first point of contact for data arriving into a business (from third parties), and the hub for data held within a business, and as such, plays a key part in ensuring that data is fit for use.
This section concentrates on some of the features and methods within Talend that enable the developer to identify and capture invalid data so that it can be reported.
Data Integration is all about moving data around and a key aspect. This is the validation of your input data.
An organization might typically use Talend Open Studio for Data Integration for:
- synchronization or replication of databases
- right-time or batch exchanges of data
- ETL (Extract/Transform/Load) for anal
- data migration
- complex data transformation and loading
- data quality exercises
- big data
A lot of our DATA VALIDATION will be very basic, often consisting of a simple range-check. Often, you will simply add these checks to a tMap mapping.
The tMap is one of the core components of Talend Studio and is used very often in Jobs. The tMap component is primarily used for mapping input fields to output fields and transforming the input data in the Expression Builder of the corresponding output column.
Function tMap is an advanced component, which integrates itself as a plugin to Talend Studio.
Purpose tMap transforms and routes data from single or multiple sources to single or multiple destinations.
Enabling and disabling reject flows
Rejected data is closely coupled to schemas (METADATA AND SCHEMAS), as many of the input and output components will validate data according to a schema definition and then pass any incorrect data to a reject flow.
Reject flows, thus allow non-conforming data to be collected and handled as per the needs of a project.
In some cases, depending upon the business requirement, rejects are not acceptable. In these cases, reject flows should be disabled and the job is allowed to fail.
Whether a job dies on the first incorrect record, it collects rejects in a file, or completely ignores rejects, which is a design decision that should be based upon the requirements for the process. Where possible, designers and developers should attempt to define how errors and rejects are handled before coding begins.
Subscribe to our youtube channel to get new updates..!
Open the job jo_cook_ch03_0000_inputReject.
How to achieve it…
- Run the job and it will fail with an unparseable date error.
- Open the tFileInputDelimited component and in the Basic settings tab, uncheck the Die on error
- Drag a new tLogRow to the canvas, open it and set the mode to Table.
- Right-click the tFileInputDelimited component, and select Row, then reject. Connect this row to the new tLogRow. Your job should look like the following:
- Run the job. You should see that two records have now been passed to the reject.
How it works…
When Talend reads an input data source, it attempts to parse the data into the schema. If it cannot parse the data, then it will fail with a Java error.
When the die on error box is unchecked, Talend enables a reject flow to be added to the component and changes the action of the component, so that instead of killing the job, invalid rows are passed to a reject flow.
You can, if required, ignore any rejects by not attaching a reject flow, but it is wise to double check first if this is a genuine requirement for the process. Most cases of rejects being ignored are down to programmers forgetting to check if there is a reject flow for the given component.
In the tFileInputDelimited component, there is an Advanced tab that enables data to be validated against the schema and for dates to be checked. These options provide an added level of validation for the input data.
It is always worth checking every input component for the presence of reject flow when to die on error is unchecked, or for additional validation options.
In many cases, these validations will not be explicitly stated in a specification, so it is always worth checking with the customer to see if they require rejects and/or validation rules to be added.
Gathering all rejects prior to killing a job
As an alternative to collecting incorrect rows up to the point where a job fails (Die on error), you may wish to capture all rejects from an input before killing a job.
This has the advantage of enabling support personnel to identify all problems with source data in a single pass, rather than having to re-execute a job continually to find and fix a single error/set of errors at a time.
Open the job jo_cook_ch03_0010_validationSubjob. As you can see, the reject flow has been attached and the output is being sent to a temporary store (tHashMap).
How to do it…
- Add the tJava, tDie, tHashInput, and tFileOutputDelimited
- Add onSubjobOk to tJava from the tFileInputDelimited
- Add a flow from the tHashInput component to the tFileOutputDelimited component.
- Right-click the tJava component, select Trigger and then Runif. Link the trigger to the tDie. Click the if link, and add the following code
((Integer)globalMap.get("tFileOutputDelimited_1_NB_LINE")) > 0
- Right-click the tJava component, select Trigger, and then Runif. Link this trigger to the tHashInput
- The job should now look like the following:
- Drag the generic schema sc_cook_ch3_0010_genericCustomer to both the tHashInput and tFileOutputDelimited.
- Run the job. You should see that the tDie component is activated because the file contained two errors.
How it works…
Valid rows are held in temporary storage (tHashOutput) and invalid rows are written to a reject file until all input rows are processed.
The job then checks to see how many records are rejected (using the RunIf link). In this instance, there are invalid rows, so the RunIf link is triggered, and the job is killed using tDie.
By ensuring that the data is correct before we start to process it into a target, we know that the data will be fit for writing to the target, and thus avoiding the need for rollback procedures.
The records captured can then be sent to the support team, who will then have a record of all incorrect rows. These rows can be fixed in situ within the source file and the job simply re-runs from the beginning.
This recipe is particularly important when rollback/correction of a job may be particularly complex, or where there may be a higher than expected number of errors in an input.
An example would be when there are multiple executions of a job that appends to a target file. If the job fails midway through, then rolling back involves identifying which records were appended to the file by the job before failure, removing them from the file, fixing the offending record, and then re-running. This runs the risk of a second error causing the same thing to happen again.
On the other hand, if the job does not die, but a subsection of the data is rejected, then the rejects must be manipulated into the target file via a second manual execution of the job.
So, this method enables us to be certain that our records will not fail to write due to incorrect data and therefore saves our target from becoming corrupted.
Validating against the schema
The tSchemaComplianceCheck is a very useful component for ensuring that the data passing downstream is correct with respect to the defined schema.
Open the job jo_cook_ch03_0020_schemaCompliance.
How to achieve it…
- Run the job. You should see two rows being rejected.
- Add a tSchemaComplianceCheck and two tLogRow, right click on tSchemaComplianceCheck_1 and select Row, then Rejects. Join the flow one of the new tLogRow. Connect the main to the other as shown:
Upcoming Batches - Talend Training!
6:30 AM IST
7:00 AM IST
6:30 AM IST
6:30 AM IST
- Now, when you run the job, you will see an additional reject row being output from the tSchemaComplianceCheck component.
How it works…
The tFileInputDelimited component will detect only some of the anomalies within the data, whereas the tSchemaComplianceCheck component will perform a much more thorough validation of the data.
If you look at the output, you will see the log entry, which shows that the name field has exceeded the maximum for the schema:
Rejecting rows using tMap
This recipe shows how tMap can be used to ensure that unwanted rows are not propagated downstream. This may be as a result of the filter criteria or a validation rule.
Related Page: Joining Data Using tMap, Hierarchical Joins | Talend
Open the job jo_cook_ch03_0030_tMapRejects.
How to do it…
- Open the tMap and click the Activate/inactivate expression filter button for the validRows
- In the Expression box, add the code age >= 18.
- Click on the tMapRejects output and then on the tMapSettings
- Click on Catch output reject value column to set it to true.
- Run the job. You should see that one of the rows has been rejected.
How it works…
In this example, tMap is working like an if statement. Therefore, if a customer’s age is greater than eighteen, then write the record to validRows or else pass the data to the tMapRejects.
You can use this method to test for multiple different rejects, by adding additional outputs and adding different filter criteria to each output.
The tMap component will process any number of filter criteria from top to bottom, as long as you remember to catch the output rejects for each additional output table.
Note that if you forget to set catch output rejects to true, then all the input records will be passed to all the outputs. Sometimes, this may be what you want to do, but in the case of the preceding exercise, forgetting to set the catch output rejects would result in rows being duplicated in both of the output streams.