You are no doubt already familiar with the term scaling out, and of course the concept can be applied to SSIS systems. Although there are no magical switches here, SSIS offers several interesting features, and in this section you will see how they can be applied and how they can benefit reliability as well.
There are two important architectural features in Integration Services that give the tool great capabilities. The first feature is the Lookup Transformation. The second feature is the data pipeline itself. These features give SSIS the abilities to move and compare data in a very fast and efficient manner.
The Lookup Transformation has the ability to cache data, and it has several options available to optimize that configuration. The Cache Data Sources available to this transformation include pure in-memory cache and persistent file storage (also called persisted cache). Both of these options require the transformation to be configured to use the full-cache option, however. A Connection Manager called the Cache Connection Manager governs these two types of caching options.
Caching operations in the Lookup Transformation provide three modes to choose from: full, partial, or none. Using the full-cache mode, the lookup set will be stored in its entirety and will load before the lookup operation actually runs, but it will not repopulate on every subsequent lookup operation. A partial cache stores only matched lookups, and no-cache will repopulate the cache every time. Of course, you can also manually configure the size of the cache to store, measured in megabytes.
Persistent file storage is another very exciting caching feature, as it will store your cache in a file that can be reused across other packages. Talk about reusability! To add to the benefit, the cache can be loaded from a variety of sources, including text files, XML files, or even web services. Lastly, partialcache mode has a cache option called miss-cache, in which items being processed for lookup that do not have a match will be cached, so they will not be retried in future lookup operations. All these features translate into scalability and performance gains.
The data pipeline is the workhorse of the data processing engine. The data pipeline architecture is able to provide a fully multi-threaded engine for true parallelism. This allows most pipelines to scale out well without a lot of manual tweaking.
Scaling Out Memory Pressures
By design, pipeline processing takes place almost exclusively in memory. This enable faster data movement and transformations, and a design goal should always include making a single pass over your data. This eliminates time consuming staging and the costs of reading and writing the same data several times. The potential disadvantage of this is that for large amounts of data and complicated sets of transformations, you need a large amount of memory, and it needs to be the right type of memory for optimum performance.
The virtual memory space for 32-bit Windows operating systems is limited to 2GB by default. Although you can increase this amount through the use of the /3GB switch applied in the boot.ini file, this often falls short of the total memory currently available. This limit is applied per process, which for your purposes means a single package during execution. Therefore, by partitioning a process across multiple packages, you can ensure that each of the smaller packages is its own process, thereby taking advantage of the full 2–3GB virtual space independently. The most common method of chaining packages together to form a consolidated process is through the Execute Package Task, in which case it is imperative that you set the Child package to execute out of process. To enable this behavior, you must set the ExecuteOutOfProcess property to true.
Note that unlike the SQL Server database engine, SSIS does not support Advanced Windowing Extensions (AWE), so scaling out to multiple packages across processes is the only way to take advantage of larger amounts of memory. If you have a very large memory requirement, then you should consider a 64-bit system for hosting these processes. Whereas just a few years ago it would have been very expensive and hard to find software and drivers for all your needs, 64-bit systems are now common for enterprise hardware and should be considered for any business database.
For a more detailed explanation of how SSIS uses memory, and the inmemory buffer structure used to move data through the pipeline
Scaling Out by Staging Data
The staging of data is very much on the decline; after all, why incur the cost of writing to and reading from a staging area when you can perform all the processing in memory with a single pass of data? With the inclusion of the Dimension and Partition Processing Destinations, you no longer need a physical Data Source to populate your SQL Server Analysis Services (SSAS) cubes — yet another reason for the decline of staging or even the traditional data warehouse. Although this is still a contentious debate, the issue here is this: Should you use staging during the SSIS processing flow? Although it may not be technically required to achieve the overall goal, there are still certain scenarios when you may want to, for both scalability and reliability reasons.
For this discussion, staging could also be described as partitioning. Although the process can be implemented within a single Data Flow Task, for one or more of the reasons described next, it may be subdivided into multiple Data Flow Tasks. These smaller units could be within a single package, or they might be distributed through several, as discussed next. The staged data will be used only by another Data Flow Task and does not need to be accessed directly through regular interfaces. For this reason, the ideal choices for the source and destinations are the raw file adapters. This could be described as vertical partitioning, but you could also overlay a level of horizontal partitioning, by executing multiple instances of a package in parallel.
Raw file adapters enable you to persist the native buffer structures to disk. The in-memory buffer structure is simply dumped to and from the file, without any translation or processing as found in all other adapters, making these the fastest adapters for staging data. You can take advantage of this to artificially force a memory checkpoint to be written to disk, thereby enabling you to span multiple Data Flow Tasks and packages. Staging environments and raw files are also discussed in Understanding and Tuning the Data Flow Engine Topic, but some specific examples are illustrated here.
The key use for raw files is that by splitting one Data Flow Task into at least two individual Data Flow Tasks, the primary task can end with a raw file destination and the secondary task can begin with a raw file source. The buffer structure is exactly the same between the two tasks, so the split is basically irrelevant from an overall flow perspective, but it provides perfect preservation of the two tasks.
Data Flow Restart
As covered previously in this Reliability and Scalability Topic , the checkpoint feature provides the capability to restart a package from the point of failure, but it does not extend inside a Data Flow Task. However, if you divide a Data Flow Task into one or more individual tasks, each linked together by raw files, you immediately gain the capability to restart the combined flow. Through the correct use of native checkpoints at the (Data Flow) task level, this process becomes very simple to manage.
Deciding where to divide a flow is subjective, but two common choices are immediately after extraction and immediately after transformation, prior to load. The post-extraction point offers several key benefits, including performance, meeting source system service level agreements (SLAs), and consistency. Many source systems are remote, so extraction might take place over suboptimal network links, and it can be the slowest part of the process. By staging immediately after the extraction, you don’t have to repeat this slow step in the event of a failure and restart. There may also be an impact on the source system during the extraction, which often takes place during a specified time window when utilization is low. In this case, it may be unacceptable to repeat the extract in the event of a failure until the next time window, usually the following night. Finally, the initial extraction will take place at a more consistent time each night, which could provide a better base for tracking history. Staging post-transformation simply ensures that the transformation is not wasted if the destination system is unavailable. If the transformations are time intensive, this cost-savings can really add up!
You may wish to include additional staging points mid-transformation. These would usually be located after particularly expensive operations and before those that you suspect are at risk to fail. Although you can plan for problems, and the use of error outputs described previously should enable you to handle many situations, you can still expect the unexpected and plan a staging point with this in mind. The goal remains the same — the ability to restart as close to the failure point as possible and to reduce the cost of any reprocessing required.
Below screen shot shows an example data load process that you may wish to partition into multiple tasks to take advantage of Data Flow restart.
For this scenario, the OLE DB Source Component connects to a remote SQL Server over a slow network link. Because of the time taken for this data extraction and the impact on the source system, it is not acceptable to repeat the extract if the subsequent processing fails for any reason. Therefore, you choose to stage data through a raw file immediately after the Source Component. The resulting Data Flow Task layout is shown in below screen shot.
The Flat File Source data is accessed across the LAN, and it needs to be captured before it is overwritten. The sort operation is also particularly expensive because of the volume of data. For this reason, you choose to stage the data after the sort is complete. The resulting Data Flow Task is shown in below screen shot.
Finally, you use a third Data Flow Task to consume the two staging raw files and complete the process, as shown in below screen shot.
Following this example, a single Data Flow Task has been divided into three separate tasks. For the purpose of restarting a failed process, you would use a single package and implement checkpoints on each of the three Data Flow Tasks.
Scaling across Machines
In a similar manner to the Data Flow restart just discussed, you can also use raw file adapters to partition the Data Flow Task. By separating tasks into different packages, you can run packages across machines. This may be advantageous if a specific machine has properties not shared with others. Perhaps the machine capable of performing the extract is situated in a different network segment from the machine best suited for processing the data, and direct access is unavailable between the main processing machine and the source. The extract could be performed, and the main processing machine would then retrieve the raw data to continue the process. These situations are organizational restrictions, rather than decisions driven by the design architecture.
Another method for scaling across machines is to use horizontal partitioning. A simple scenario would utilize two packages. The first package would extract data from the source system, and through the Conditional Split you produce two or more exclusive subsets of the data and write this to individual raw files. Each raw file would contain some of the rows from the extract, as determined by the expression used in the Conditional Split. The most common horizontal partition scheme is time-based, but any method could be used here. The goal is to subdivide the total extract into manageable chunks; so, for example, if a sequential row number is already available in the source, this would be ideal, or one could be applied (see the T-SQL ROW_NUMBER function). Similarly a Row Number Transformation could be used to apply the numbering, which could then be used by the split, or the numbering and splitting could be delivered through a Script Component.
With a sorted data set, each raw file may be written in sequence, completing in order, before moving on to the next one. While this may seem uneven and inefficient, it is assumed that the time delay between completion of the first and final destinations is inconsequential compared to the savings achieved by the subsequent parallel processing.
Once the partitioned raw files are complete, they are consumed by the second package, which performs the transformation and load aspects of the processing. Each file is processed by an instance of the package running on a separate machine. This way, you can scale across machines and perform expensive transformations in parallel. For a smaller-scale implementation, where the previously described 32-bit virtual memory constraints apply, you could parallel process on a single machine, such that each package instance would be a separate thread, with its own allocation of virtual memory space.
For destinations that are partitioned themselves, such as a SQL Server data warehouse with table partitions or a partitioned view model, or Analysis Services partitions, it may also make sense to match the partition schema to that of the destination, such that each package addresses a single table or partition.
Below screen shot shows a sample package that for the purposes of this example you will partition horizontally.
In this scenario, the Fuzzy Lookup is processing names against a very large reference set, and this is taking too long. To introduce some parallel processing, you decide to partition on the first letter of a name field. It is deemed stable enough for matches to be within the same letter, although in a real-world scenario this may not always be true. You use a Conditional Split Transformation to produce the two raw files partitioned from A to M and from N to Z. This primer package is illustrated in below screen shot.
Ideally, you would then have two instances of the second package (Below screen shot) running in parallel on two separate machines. However, you need to ensure that the lookup data is filtered on name to match the raw file. Not all pipeline component properties are exposed as expressions, allowing you to dynamically control them, so you would need two versions of the package, identical except for a different Reference table name property in the Fuzzy Lookup, also shown in below screen shot. In preparation, you would create two views, one for names A to M and the other for names N to Z, to match the two raw files. The two package versions would each use the view to match the raw file they will process.
For any design that uses raw files, the additional I/O cost must be evaluated against the processing performance gains, but for large-scale implementations it offers a convenient way of ensuring consistency within the overall flow and doesn’t incur the translation penalty associated with other storage formats.
Scaling Out with Parallel Loading
You have seen several ways to scale out SSIS in this Reliability and Scalability Topic. Now you will learn how to scale out using parallelization with the data loads. You can accomplish faster load times by loading multiple partitions at once using SSIS from either a staging environment or multiple files of sorted data. If the data resides in a flat file, you may want to consider loading those files into a staging table so the data can be pre-sorted.
The first item you will need is a control table. This will tell the SSIS packages which data to pull for each partition in the destination. The control table will contain a list of data that needs to be checked out. The following code will create this table:
CREATE TABLE [dbo].[ctlTaskQueue](
[TaskQueueID] [smallint] IDENTITY(1,1) NOT NULL,
[PartitionWhere] [varchar](20) NOT NULL,
[Priority] [tinyint] NOT NULL,
[StartDate] [datetime] NULL,
[CompleteDate] [datetime] NULL,
CONSTRAINT [PK_ctlTaskQueue] PRIMARY KEY CLUSTERED
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY
ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY])
The PartitionWhere column is the data used to divide the data up into parallel loading. This will be a part of the WHERE clause that queries from the source of the staging table conditionally. For example, Figure 15-41 shows you a table that’s ready to be used. Each WHERE clause should ideally align with a partition on your destination table. The StartDate and CompleteDate columns show you when the work was checked out and completed.
Run the following code to populate these values:
INSERT INTO [dbo].[ctlTaskQueue]
(2001,1, null, null),
(2002,2, null, null),
(2003,3, null, null),
(2004,4, null, null),
(2005,5, null, null),
(2006,6, null, null),
(2007,7, null, null),
(2008,8, null, null),
(2009,9, null, null),
(2010,10, null, null),
(2011,11, null, null)
Now create an SSIS package named ParallelDemo.dtsx that will perform the parallel load. This section will show you the basics for creating a package to perform a parallel load. You will still need to apply best practices to this package afterward, for example, additional elements like package checkpoints, parameters, and a master package to run all of your parallel loads. Create three SSIS variables in the package, as shown in below screen shot. Don’t worry about the value of the strSQLStatement variable, as you will modify it later to use an expression.
Next drag in a For Loop Container, which will be used to look for new work in the control table you just created. Configure the container to use the intTaskQueueID variable, as shown in below screen shot. The loop will repeat while intTaskQueueID is not 0. The tasks inside the loop will be responsible for setting the variable to 0 once the work is complete.
Next you will run the following code to create a stored procedure that will check out the work to be done. The stored procedure is going to output two variables: one for the task that needs to be worked on (@TaskQueueID) and the filter that will be applied on the source table (@PartitionWhere). After the task is read, it also checks the work out by updating the StartDate column.
CREATE PROCEDURE [dbo].[ctl_UseTask]
@TaskQueueID int OUTPUT,
@PartitionWhere varchar(50) OUTPUT
SELECT TOP 1
@TaskQueueID = TaskQueueID,
@PartitionWhere = PartitionWhere
WHERE StartDate is NULL
AND CompleteDate is NULL
ORDER BY Priority asc
IF @TaskQueueID IS NULL
SET @TaskQueueID = 0
SET StartDate = GETDATE()
WHERE TaskQueueID = @TaskQueueID
Drag an Execute SQL Task into the For Loop Container and configure it to match below screen shot, using the following code in the SQLStatement. This will execute the stored procedure and get the task queue and the WHERE clause values from the table. You could also have altered the stored procedure to have an input variable of what queue you’d like to read from.
DECLARE @return_value int,
EXEC @return_value = [dbo].[ctl_UseTask]
@TaskQueueID = @TaskQueueID OUTPUT,
@PartitionWhere = @PartitionWhere OUTPUT
SELECT convert(int,@TaskQueueID) as N’@TaskQueueID’,
@PartitionWhere as N’@PartitionWhere’
Open the Result Set window in the Execute SQL Task and configure it to match below screen shot. Each value maps to a variable in SSIS. The Result Name of 0 means the first column returned, and the columns are sequentially numbered from there.
Now you will set the variable strSQLStatement to be dynamic by adding the following code to the Expression property in the strSQLStatement variable. This strSQLStatement variable will be used as your OLE DB Source Component in the Data Flow Task you will create in a moment. This query could be whatever you’d like. In this example, you are gathering data out of a Transactions table between a given date range. The strWherePartition is populated from the previous Execute SQL Task, and the date range should match your target table’s partition layout. The ORDER BY clause is going to presort the data on the way out. Ideally this column would have a clustered index on it to make the sort more efficient.
WHERE [ShipDate] > ‘”+ @[User::strWherePartition] + “-01-01’ and
[ShipDate] <= ‘” + @[User::strWherePartition] + “-12-31’ ORDER BY
Now you are ready to create the Data Flow Task. Drag a Data Flow Task over and connect it to the Execute SQL Task inside the loop. In the Data Flow tab, drag an OLE DB Source Component over and connect it to your source connection manager and use the strSQLStatement variable as your source query, as seen in below screen shot.
You can connect the OLE DB Source Component to any other component you’d like for this example, such as a Union All Transformation. You are using a Union All Transformation as a temporary dead-end destination so you can see the number of rows that flow down that part of the Data Flow path. You can also use a Data Viewer before the Union All Transformation to see the data in those rows. Ideally though, you should connect the source to any business logic that you may have and then finally into a destination. Your ultimate destination should ensure that the data is committed into a single batch. Otherwise, fragmentation may occur. You can do this by ensuring that the destination’s “Max Insert Commit Size” is a high number, well above the number of rows that you expect (though in some cases you may not know this number). Keep in mind that the source query partition should have already sorted the data and be a small enough chunk of data to not cause major impact to the transaction log in SQL Server.
Back in the Control Flow, you’ll want to add conditional logic between the Execute SQL Task and the Data Flow Task to execute the Data Flow Task only if more work is pending. To do this, double-click on the green precedence constraint connecting the two tasks and change the Evaluation operation property to be Expression and Constraint. Then, set the expression to @intTaskQueueID > 0, as seen in below screen shot.
The final step is to update the queue table to say the work is complete. You can do this by connecting an Execute SQL Task to the Data Flow Task. This task should update the ctlTaskQueue table in the AdventureWorksDW database to show the CompleteDate to the current time. This can be done in the following query or stored procedure:
SET CompleteDate = GETDATE()
WHERE TaskQueueID = ?
The question mark in the previous code will be mapped to the intTaskQueueID variable in the Parameter Mapping tab in the Execute SQL Task. The final Control Flow of the package should look like Below screen shot.
With the plumbing done, you’re now ready to create parallelism in your package. One way of accomplishing this is through a batch file. The following batch file will perform this with the START command. Optionally, you can reset your tables through the SQLCMD executable, as shown here as well. The batch file will execute DTEXEC.exe N number of times in parallel and work on independent workloads. Where you see the SQLCMD command, you may also want to ensure that the trace flags are on if they’re not on already.
sqlcmd -E -S localhost -d AdventureWorksDW -Q “update
dbo.ctlTaskQueue set StartDate = NULL, CompleteDate = NULL”
FOR /L %%i IN (1, 1, %1) DO (
ECHO Spawning thread %%i
START “Worker%%i” /Min “C:\Program Files\Microsoft SQL
“C:\ProSSIS\Code\Ch15\09_ParallelDemo.dtsx” /CHECKPOINTING OFF
The %1 in the preceding code represents the N variable that is passed in. To use the batch file, use the following command, where 5 represents the number of DTEXEC.exe commands that you wish to run in parallel. Save this file as ParallelBatch.cmd. You can do this from Notepad. Then, run the batch file with the following code in a command window.
As each worker thread spins up, it will log its number. The results will look like below screen shot.
WARNING Beware of executing packages from the command line. Even though executing packages from the command line can seem like an easy way to achieve parallelism and scalability, the execution functionality is limited. Using the DTEXEC.exe command will only work with Legacy Package Model features. If you want to take advantage of the latest Project Model features, you will need to utilize an alternative method.
One final note about parallelism is that if you load data using multiple cores into a target table, it will cause fragmentation in the target table. Each core would handle sorting its individual chunk of data that it’s responsible for; then when each chunk of data is pieced together, data may be inserted into different pages. One way to prevent that is to ensure that each thread of DTEXEC.exe connects to a different TCP/IP port and in soft NUMA node. With soft NUMA, you can bind any port or IP address to a given CPU core. By doing this, each SQL Server Destination in the Data Flow Task would always use only one CPU in SQL Server. It essentially allows SSIS and the SQL Server Destination to mimic MAXDOP 1 in T-SQL.
There is more setup needed to create the soft NUMA node. A complete soft NUMA explanation is beyond the scope of this Tutorial, and you can learn more on SQL Server Tutorial Online. The following steps will point you in the right direction to getting soft NUMA working:
- Create the soft NUMA node in the registry of the SQL Server instance, which isolates each node to a CPU core.
- Bind the soft NUMA nodes to a given port in SQL Server Configuration Manager under Network Configuration.
- Restart the SQL Server instance.
- Change the package to bind the connection manager in the worker package to a different port by changing the ServerName property and adding the port following the server name (InstanceName, 1433, for example). You’ll want this port to be dynamically set through an expression and variable so the variable can be passed in through the calling batch file.