Mindmajix

Developing a Custom Transformation Component

Building the Transformation Component

In this section, you build the transformation that is going to take data from the upstream Source adapter. After reversing the strings, it will pass the data to the downstream component. In this example, the downstream component will be the Destination adapter, which you’ll write after you’re done with the transformation. The component needs a few things prepared in advance in order to execute efficiently during its lifetime (ReverseString.cs):

private ColumnInfo[] _inputColumnInfos;
const string ErrorInvalidUsageType = “Invalid UsageType for column
‘{0}'”;
const string ErrorInvalidDataType = “Invalid DataType for column
‘{0}'”;
CLSCompliant(false)]
public struct ColumnInfo
{
public int bufferColumnIndex;
public DTSRowDisposition columnDisposition;
public int lineageID;
}

The struct that you create here, called ColumnInfo, is something you use in various guises repeatedly in your components. It is very useful for storing details about columns that you will need later in the component. In this component, you will store the BufferColumnIndex, which indicates where the column is in the buffer, so that you can retrieve the data. You’ll store how the user wants the row to be treated in an error, and you’ll store the column’s LineageID, which helps to retrieve the column from the InputColumnCollection.

Design-Time Methods

Logically, it would make sense to code the component beginning with the design time, followed by the runtime. When your component is dropped into the SSIS Package Designer surface, it first makes a call to ProvideComponentProperties. In this component, you want to set up an input and an output, and you need to tell your component how it should handle data — as in whether it is a synchronous or an asynchronous transformation, as discussed earlier in the chapter. Just as you did with the Source adapter, we’ll look at the whole method first and then examine parts of the method in greater detail. Here is the method in full (ReverseString.cs) 

public override void ProvideComponentProperties()
{
ComponentMetaData.UsesDispositions = true;
IDTSInput100 ReverseStringInput =
ComponentMetaData.InputCollection.New();
ReverseStringInput.Name = “RSin”;
ReverseStringInput.ErrorRowDisposition =
DTSRowDisposition.RD_FailComponent;
IDTSOutput100 ReverseStringOutput =
ComponentMetaData.OutputCollection.New();
ReverseStringOutput.Name = “RSout”;
ReverseStringOutput.SynchronousInputID = ReverseStringInput.ID;
ReverseStringOutput.ExclusionGroup = 1;
AddErrorOutput(“RSErrors”, ReverseStringInput.ID,
ReverseStringOutput.ExclusionGroup);
}

Breaking it down, you first tell the component to use dispositions:

ComponentMetaData.UsesDispositions = true;

In this case, you’re telling the component that it can expect an error output. Now you move on to adding an input to the component:

// Add a new Input, and name it.
IDTSInput100 ReverseStringInput =
ComponentMetaData.InputCollection.New();
ReverseStringInput.Name = “RSin”;
// If an error occurs during data movement, then the component will
fail.
ReverseStringInput.ErrorRowDisposition =
DTSRowDisposition.RD_FailComponent;

Next, you add the output to the component:

// Add a new Output, and name it.
IDTSOutput100 ReverseStringOutput =
ComponentMetaData.OutputCollection.New();
ReverseStringOutput.Name = “RSout”;
// Link the Input and Output together for a synchronous behavior
ReverseStringOutput.SynchronousInputID = ReverseStringInput.ID;

This is similar to adding the input, except that you specify that this is a synchronous component by setting the SynchronousInputID on the output to the ID of the input you created earlier. If you were creating an asynchronous component, you would set the SynchronousInputID of the output to be 0, like this:

ReverseStringOutput.SynchronousInputID = 0

This tells SSIS to create a buffer for the output that is separate from the input buffer. This is not an asynchronous component, though; you will revisit some of the subtle differences later 

AddErrorOutput creates a new output on the component and tags it as being an error output by setting the IsErrorOut property to true. To the method, you pass the name of the error output you want, the input’s ID property, and the output’s ExclusionGroup. An ExclusionGroup is needed when two outputs use the same synchronous input. Setting the exclusion group enables you to direct rows to the correct output later in the component using DirectRow.

AddErrorOutput(“RSErrors”,
ReverseStringInput.ID,ReverseStringOutput.ExclusionGroup);
ReverseStringOutput.ExclusionGroup = 1;

That’s it for ProvideComponentProperties.

Now you’ll move on to the Validate method. As mentioned earlier, this method is called on numerous occasions, and it is your opportunity within the component to check whether what has been specified by the user is allowable by the component

Here is your completed Validate method (ReverseString.cs):

[CLSCompliant(false)]
public override DTSValidationStatus Validate()
{
bool Cancel;
if (ComponentMetaData.AreInputColumnsValid == false)
return DTSValidationStatus.VS_NEEDSNEWMETADATA;
foreach (IDTSInputColumn100 inputColumn in
ComponentMetaData.InputCollection[0].InputColumnCollection) \
{
if (inputColumn.UsageType != DTSUsageType.UT_READWRITE)
{
ComponentMetaData.FireError(0, inputColumn.IdentificationString,
String.Format(ErrorInvalidUsageType, inputColumn.Name), “”,
0, out Cancel);
return DTSValidationStatus.VS_ISBROKEN;
}
if (inputColumn.DataType != DataType.DT_STR && inputColumn.DataType
!=
DataType.DT_WSTR)
{
ComponentMetaData.FireError(0, inputColumn.IdentificationString,
String.Format(ErrorInvalidDataType, inputColumn.Name), “”,
0, out Cancel);
return DTSValidationStatus.VS_ISBROKEN;
}
}
return base.Validate();
}

This method will return a validation status to indicate the overall result and may cause subsequent methods to be called. Refer to the SQL Serverdocumentation for a complete list of values (see DTSValidationStatus). 

Now, to break down the Validate method. A user can easily add and remove an input from the component at any stage and later add it back. It may be the same one, but it may be a different one, presenting the component with an issue. When an input is added, the component stores the LineageIDs of the input columns. If that input is removed and another is added, those LineageIDs may have changed because something such as the query used to generate those columns may have changed. Therefore, you are presented with different columns, so you need to determine whether that has happened; if so, you need to invalidate the LineageIDs. If that’s the case, the component will call ReinitializeMetaData.

if (ComponentMetaData.AreInputColumnsValid == false)

{ return DTSValidationStatus.VS_NEEDSNEWMETADATA; }

Next, you should ensure that each of the columns in the InputColumnCollection chosen for the component has been set to READ WRITE. This is because you will be altering them in place — in other words, you will read a string from a column, reverse it, and then write it back over the old string. If they are not set to READ WRITE, you need to feed that back by returning VS_ISBROKEN. You can invoke the FireError method on the component, which results in a red cross displayed on the component, along with tooltip text indicating the exact error:

if (RSincol.UsageType != DTSUsageType.UT_READWRITE)
{
ComponentMetaData.FireError(0, inputColumn.IdentificationString,
String.Format(ErrorInvalidUsageType, inputColumn.Name), “”,
0, out Cancel);
return DTSValidationStatus.VS_ISBROKEN;
}

The last thing you do in Validate is verify that the columns selected for the component have the correct data types:

if (inputColumn.DataType != DataType.DT_STR && inputColumn.DataType
!=
DataType.DT_WSTR)

If the data type of the column is not in the list, then you again fire an error and set the return value to VS_ISBROKEN.

Now you will look at the workhorse method of so many of your components: ReinitializeMetaData. Here is the method in full (ReverseString.cs):

public override void ReinitializeMetaData()
{
if (!ComponentMetaData.AreInputColumnsValid)
{
ComponentMetaData.RemoveInvalidInputColumns();
}
base.ReinitializeMetaData();
}

Remember that if Validate returns VS_NEEDSNEWMETADATA, then the component internally automatically calls ReinitializeMetaData. The only time you do that for this component is when you have detected that the LineageIDs of the input columns are not quite as expected — that is to say, they do not exist on any upstream column and you want to remove them:

if (!ComponentMetaData.AreInputColumnsValid)
{
ComponentMetaData.RemoveInvalidInputColumns(); }

You finish by calling the base class’s ReinitializeMetaData method as well. Earlier, we referred to this method as the workhorse of your component because you can perform all kinds of triage on it to rescue the component from an aberrant user.

The SetUsageType method (ReverseString.cs) is called when the user is manipulating how the column on the input will be used by the component. In this component, this method validates the data type of the column and whether the user has set the column to be the correct usage type. The method returns an IDTSInputColumn, and this is the column being manipulated:

[CLSCompliant(false)]
public override IDTSInputColumn100 SetUsageType(int inputID,
IDTSVirtualInput100
virtualInput, int lineageID, DTSUsageType usageType)
{
IDTSVirtualInputColumn100 virtualInputColumn =
virtualInput.VirtualInputColumnCollection.GetVirtualInputColumnByLineageID(
lineageID);
if (usageType == DTSUsageType.UT_READONLY)
throw new Exception(String.Format(ErrorInvalidUsageType,
virtualInputColumn.Name));
if (usageType == DTSUsageType.UT_READWRITE)
{
if (virtualInputColumn.DataType != DataType.DT_STR &&
virtualInputColumn.DataType != DataType.DT_WSTR)
{
throw new Exception(String.Format(ErrorInvalidDataType,
virtualInputColumn.Name));
}
}
return base.SetUsageType(inputID, virtualInput, lineageID,
usageType);
}

The first thing the method does is get a reference to the column being changed, from the virtual input, which is the list of all upstream columns available.

You then perform the tests to ensure the column is suitable, before proceeding with the request through the base class. In this case, you want to ensure that the user picks only columns of type string. Note that this method looks a lot like the Validate method. The only real difference is that the Validate method, obviously, returned a different object, but it also reported errors back to the component. Validate uses the FireError method, but SetUsageType throws an exception; in SetUsageType you are checking against the VirtualInput, and in Validate() you check against the Input100. (We used to use FireError here, but the results bubbled back to the user weren’t as predictable, and we were advised that the correct behavior is to throw a new exception.) This method along with others such as InsertOutput, DeleteInput, InsertInput, OnInputAttached, and so on are important because they are the key verification methods you can use that enable you to validate in real time a change that is made to your component, and prevent it if necessary.

The InsertOutput design-time method is called when a user attempts to add an output to the component. In this component, you want to prohibit that, so if the user tries to add an output, you should throw an exception indicating that it is not allowed:

[CLSCompliant(false)]
public override IDTSOutput100 InsertOutput(DTSInsertPlacement
insertPlacement, int
outputID)
{
throw new Exception(“You cannot insert an output (” +
outputID.ToString() + “)”);
}

You do the same when the user tries to add an input to your component in the InsertInput method:

[CLSCompliant(false)]
public override IDTSInput100 InsertInput(DTSInsertPlacement
insertPlacement, int
inputID)
{
throw new Exception(“You cannot insert an input (” +
inputID.ToString() + “)”);
}

Notice again how in both methods you throw an exception in order to tell users that what they requested is not allowed.

If the component were asynchronous, you would need to add columns to the output yourself. You have a choice of methods in which to do this. If you want to add an output column for every input column selected, then the SetUsageType method is probably the best place to do that. This is something about which tutorials Online agrees. Another method for doing this might be OnInputPathAttached.

The final two methods you’ll look at for the design-time methods are the opposite of the previous two. Instead of users trying to add an output or an input to your component, they are trying to remove one of them. You do not want to allow this either, so you can use the DeleteOutput and the DeleteInput methods to tell them. Here are the methods as implemented in your component.

First the DeleteInput method (ReverseString.cs):

[CLSCompliant(false)]
public override void DeleteInput(int inputID)
{
throw new Exception(“You cannot delete an input”);
}

Now the DeleteOutput method:

[CLSCompliant(false)]
public override void DeleteOutput(int outputID)
{
throw new Exception(“You cannot delete an output”);
}

That concludes the code for the design-time part of your Transformation Component.

Runtime Methods

The first runtime method you’ll be using is the PreExecute method. As mentioned earlier, this is called once in your component’s life, and it is where you typically do most of your setup using the state-holding struct mentioned at the top of this section. It is the first opportunity you get to access the Buffer Manager, providing access to columns within the buffer, which you will need in ProcessInput as well. Keep in mind that you will not be getting a call to PrimeOutput, because this is a synchronous component, and PrimeOutput is not called in a synchronous component. Here is the PreExecute method in full (ReverseString.cs):

public override void PreExecute()
{
// Prepare array of column information. Processing requires
// lineageID so we can do this once in advance.
IDTSInput100 input = ComponentMetaData.InputCollection[0];
_inputColumnInfos = new
ColumnInfo[input.InputColumnCollection.Count];
for (int x = 0; x < input.InputColumnCollection.Count; x++)
{
IDTSInputColumn100 column = input.InputColumnCollection[x];
_inputColumnInfos[x] = new ColumnInfo();
_inputColumnInfos[x].bufferColumnIndex =
BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
_inputColumnInfos[x].columnDisposition = column.ErrorRowDisposition;
_inputColumnInfos[x].lineageID = column.LineageID;
}
}

This method first gets a reference to the input collection. The collection is zero-based, and because you have only one input, you have used the indexer and not the name, though you could have used the name as well:

IDTSInput100 input = ComponentMetaData.InputCollection[0];

At the start of this section was a list of the things your component would need
later. This included a struct that you were told you would use in various guises, and it also included an array of these structs. You now need to size the array, which you do here by setting it to the count of columns in the InputColumnCollection for your component:

_inputColumnInfos = new
ColumnInfo[input.InputColumnCollection.Count];

Now you loop through the columns in the InputColumnCollection. For each of the columns, you create a new instance of a column and a new instance of the struct:

IDTSInputColumn100 column = input.InputColumnCollection[x];
_inputColumnInfos[x] = new ColumnInfo();

You then read from the column the details you require and store them in the ColumnInfo object. The first thing you want to retrieve is the column’s location in the buffer. You cannot simply do this according to the order that you added them to the buffer. Though this would probably work, it is likely to catch you out at some point. You can find the column in the buffer by using a method called FindColumnByLineageID on the BufferManager object. This method takes the buffer and the LineageID of the column that you wish to find as arguments:

_inputColumnInfos[x].bufferColumnIndex =
BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);

You now need only two more details about the input column: the LineageID and the Error RowDisposition. Remember that ErrorRowDisposition tells the component how to treat an error:

_inputColumnInfos[x].columnDisposition = column.ErrorRowDisposition;
_inputColumnInfos[x].lineageID = column.LineageID;

When you start to build your own components, you will see how useful this method really is. You can use it to initialize any counters you may need or to open connections to Data Sources as well as anything else you think of. 

The final method to look at for this component is ProcessInput. Recall that this is a synchronous transformation as dictated in ProvideComponentProperties, and this is the method in which the data is moved and manipulated. This method contains a lot of information that will help you understand the buffer and what to do with the columns in it when you receive them. It is called once for every buffer passed.

 Here is the method in full (ReverseString.cs):

When you start to build your own components, you will see how useful this method really is. You can use it to initialize any counters you may need or to open connections to Data Sources as well as anything else you think of.

The final method to look at for this component is ProcessInput. Recall that this is a synchronous transformation as dictated in ProvideComponentProperties, and this is the method in which the data is moved and manipulated. This method contains a lot of information that will help you understand the buffer and what to do with the columns in it when you receive them. It is called once for every buffer passed.

Here is the method in full (ReverseString.cs):

public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
int errorOutputID = -1;
int errorOutputIndex = -1;
int GoodOutputId = -1;
IDTSInput100 inp =
ComponentMetaData.InputCollection.GetObjectByID(inputID);
#region Output IDs
GetErrorOutputInfo(ref errorOutputID, ref errorOutputIndex);
// There is an error output defined
errorOutputID = ComponentMetaData.OutputCollection[“RSErrors”].ID;
GoodOutputId =
ComponentMetaData.OutputCollection[“ReverseStringOutput”].ID;
#endregion
while (buffer.NextRow())
{/
/ Check if we have columns to process
if (_inputColumnInfos.Length == 0)
{/
/ We do not have to have columns. This is a Sync component so the
// rows will flow through regardless. Could expand Validate to check
// for columns in the InputColumnCollection
buffer.DirectRow(GoodOutputId);
}
else
{
try
{
for (int x = 0; x < _inputColumnInfos.Length; x++)
{
ColumnInfo columnInfo = _inputColumnInfos[x];
if (!buffer.IsNull(columnInfo.bufferColumnIndex))
{/
/ Get value as character array
char[] chars =
buffer.GetString(columnInfo.bufferColumnIndex)
.ToString().ToCharArray();
// Reverse order of characters in array
Array.Reverse(chars);
// Reassemble reversed value as string
string s = new string(chars);
// Set output value in buffer
buffer.SetString(columnInfo.bufferColumnIndex, s);
}}
buffer.DirectRow(GoodOutputId);
}
catch(Exception ex)
{
switch (inp.ErrorRowDisposition)
{
case DTSRowDisposition.RD_RedirectRow:
buffer.DirectErrorRow(errorOutputID, 0, buffer.CurrentRow);
break;
case DTSRowDisposition.RD_FailComponent:
throw new Exception(“Error processing ” + ex.Message);
case DTSRowDisposition.RD_IgnoreFailure:
buffer.DirectRow(GoodOutputId);
break;
}
}
}
}
}

There is a lot going on in this method, so we’ll break it down to make it more manageable. The first thing you do is find out from the component the location of the error output:

int errorOutputID = -1;
int errorOutputIndex = -1;
int GoodOutputId = -1;
#region Output IDs
GetErrorOutputInfo(ref errorOutputID, ref errorOutputIndex);
errorOutputID = ComponentMetaData.OutputCollection[“RSErrors”].ID;
GoodOutputId =
ComponentMetaData.OutputCollection[“ReverseStringOutput”].ID;
#endregion

The method GetErrorOutput returns the output ID and the index of the error output. Remember that you defined the error output in ProvideComponentProperties with the AddErrorOutput function.

Because you could have many inputs to a component, you want to isolate the input for this component. You can do that by finding the output that is passed into the method:

IDTSInput100 inp =
ComponentMetaData.InputCollection.GetObjectByID(inputID);

You need this because you want to know what to do with the row if you encounter an issue. You provided a default value for the ErrorRowDisposition property of the input in ProvideComponentProperties, but this can be overridden in the UI.

Next, you want to check that the upstream buffer has not called SetEndOfRowset, which would mean that it has no more rows to send after the current buffer; however, the current buffer might still contain rows. You then loop through the rows in the buffer like this:

while (buffer.NextRow())

You then check whether the user asked for any columns to be manipulated. Because this is a synchronous component, all columns and rows are going to flow through even if you do not specify any columns for the component. Therefore, you specify that if there are no input columns selected, the row should be passed to the normal output. You do this by looking at the size of the array that holds the collection of ColumnInfo struct objects:

if (_inputColumnInfos.Length == 0)
{
buffer.DirectRow(GoodOutputId);
}

If the length of the array is not zero, then the user has asked the component to perform an operation on the column. In turn, you need to grab each of the ColumnInfo objects from the array so you can look at the data. Here you begin your loop through the columns, and for each column you create a new instance of the ColumnInfo struct:

for (int x = 0; x < _inputColumnInfos.Length; x++)
{
ColumnInfo columnInfo = _inputColumnInfos[x];

You now have a reference to that column and are ready to start manipulating it. You first convert the column’s data into an array of chars:

char[] chars =
buffer.GetString(columnInfo.bufferColumnIndex).ToString().ToCharArray();

The interesting part of this line is the method GetString() on the buffer object. It returns the string data of the column and accepts as an argument the index of the column in the buffer. This is really easy, because you stored that reference earlier in the PreExecute method.

Now that you have the char array, you can perform some operations on the data. In this case, you want to reverse the string. This code is not particular to SSIS, and it’s a trivial example of string manipulation, but you can imagine doing something more useful here such as encryption, cleaning, or formatting:

Array.Reverse(chars);
string s = new string(chars);

Now you reassign the changed data back to the column using the SetString() method on the buffer:

buffer.SetString(columnInfo.bufferColumnIndex, s);

Again, this method takes as one of the arguments the index of the column in the buffer. It also takes the string you want to assign to that column. You can see now why it was important to ensure that this column is read/write. If there was no error, you point the row to the good output buffer:

buffer.DirectRow(GoodOutputId);

If you encounter an error, you want to redirect this row to the correct output or alternately throw an error. You do that in the catch block:

catch(Exception ex)
{
switch (inp.ErrorRowDisposition)
{
case DTSRowDisposition.RD_RedirectRow:
buffer.DirectErrorRow(errorOutputID, 0, buffer.CurrentRow);
break;
case DTSRowDisposition.RD_FailComponent:
throw new Exception(“Error processing ” + ex.Message);
case DTSRowDisposition.RD_IgnoreFailure:
buffer.DirectRow(GoodOutputId);
break;
}
}

The code is mostly self-explanatory. If the input was configured by the user to redirect the row to the error output, then you do that. If it was told to fail the component or the user did not specify anything, then you throw an exception. Otherwise, the component is asked to just ignore the errors and allow the error row to flow down the normal output.

How would this have looked had it been an asynchronous transformation? You would get a buffer from both PrimeOutput and ProcessInput. The ProcessInput method would contain the data and structure that came into the component, and PrimeOutput would contain the structure that the component expects to pass on. The trick here is to get the data from one buffer into the other. Here is one way you can approach it.

At the class level, create a variable of type PipelineBuffer, something like this:

PipelineBuffer _pipelinebuffer;

In PrimeOutput, assign the output buffer to this buffer:

public override void PrimeOutput(int outputs, int[] outputIDs,
PipelineBuffer[]
buffers)
{_
pipelinebuffer = buffers[0];
}

You now have a cached version of the buffer from PrimeOutput, and you can go straight over to ProcessInput and use it. tutorials Online has a great example of doing this in an asynchronous component: navigate to “asynchronous outputs.”

NOTE Don’t hesitate to look through tutorials Online. Microsoft has done a fantastic job of including content that offers good, solid examples. Also search for the SSIS component samples on http://msdn.microsoft.com/en-us/. Visit http://sqlsrvintegrationsrv.codeplex.com/ for some open-source components that you can download, change, and use. The projects here are also a great learning tool because the authors are usually expert SSIS developers.

0 Responses on Developing a Custom Transformation Component"

Leave a Message

Your email address will not be published. Required fields are marked *

Copy Rights Reserved © Mindmajix.com All rights reserved. Disclaimer.
Course Adviser

Fill your details, course adviser will reach you.