woensdag 5 januari 2011

SSIS : Adding a Dataflow Task to a SSIS package with C# (Part V)

Hi,

This is the fifth post (part I, part II, part III, part IV) about programming SSIS components in C#. In this post i will explain the Data Flow Task. There's very little information available on this subject on the internet. The information that i used for this blog can be found on MSDN and a post on forums of Microsoft(Jessica Elise).

During my research of certain classes, methods and properties of the dataflow task and the dataflowcomponents i came along very interestings sites:
  • I also found some code on BlackDuckKoders.com
  • Another interesting blog i found on MSDN is from mmasson
  • On the SSISBI.com blog of Duane Douglas there is quite some information available on programming SSIS packages.
  • On SQL Lion i found a example of coding a Lookup transformation. May be there is more information available.
  • Thinker wrote two articles on his blog and an interesting one is about programming the merge join.

This is SSIS package that is generated by the code in this blog:


The OleDB source component will read data from a AdventureWorks2008R2 database with the Data Access mode "Table or View".  I tried the SQL command but i didn't get it properly work (yet). So the data is read from a Production.product table and written in a dbo.Product in a Import_DB database. Below you can see the two editor windows of the generated code.


Okay, what are the steps that needs to be taken when you program a dataflow task in C#. There are 13 steps:
  1. Create a application.
  2. Create a project.
  3. Add a connection for the source adapter to the package.
  4. Add a connection for the destination adapter to the package.
  5. Add a Dataflow task to the package.
  6. Create a source component to the dataflowtask.
  7. Connect the Source connection with the source component.
  8. Create a destination component to the dataflowtask.
  9. Connect the destination connection with the destination component.
  10. Create the path between the Source and the desination component.
  11. Map the fields from the source with the desination.
  12. Write the package to a file ( you can also write this to SQL Server).
  13. Dispose the objects.

In contrast with my former post about programming SSIS packages with C# there are quite a few steps needed before creating a succesful dataflow task in a package. Lets explain the steps in more detail.

Please note that this is created with SQL Server 2008 R2 and that i used an alias for Microsoft.SqlServer.Dts.Runtime

Use this code on top of your script:

using DTS = Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;


1. Create a application.
The first thing you have to create is instantiate a new object Application. With an application class it's possible to discover an access package objects. It can also access collections and properties that contain information about the system.

            // Create an application
        DTS.Application app = new DTS.Application(); 


2. Create a project.
After creating  the application object the next step is creating the package object. The package is a collection of other containers, connections, tasks, transformations, variables, configurations, and precedence constraints.

            // Create a package
       DTS.Package pkg = new DTS.Package();

       //Setting some properties
       pkg.Name = @"MyProgrammedDataflowTask";


3. Add a connection for the source adapter to the package.
Data comes from a sources and i transferred to a destination. So a connection to the source is needed and is created with the connectionmanager class. The connectionmanager class provides the information that you must have to connect to a datasource.

           //Adding a connection to the database AdventureWorksLT2008R2
       DTS.ConnectionManager ConnMgrAdvent = pkg.Connections.Add("OLEDB");
       ConnMgrAdvent.ConnectionString = @"Provider=SQLOLEDB.1;" +
                "Integrated Security=SSPI;Initial Catalog=AdventureWorks2008R2;" +
                "Data Source=(local);";
       ConnMgrAdvent.Name = @"AdventureWorks2008R2";
       ConnMgrAdvent.Description = @"SSIS Connection Manager for OLEDB Source
                AdventureWorks2008R2";


4. Add a connection for the destination adapter to the package.
Logically, the destination adapter needs to be created too.

            //Adding a connection to the database Import_DB
        DTS.ConnectionManager ConnMgrImport_DB = pkg.Connections.Add("OLEDB");
        ConnMgrImport_DB.ConnectionString = @"Provider=SQLOLEDB.1;" +
                "Integrated Security=SSPI;Initial Catalog=Import_DB;" +
                "Data Source=(local);";
        ConnMgrImport_DB.Name = @"Import_DB";
        ConnMgrImport_DB.Description = @"SSIS Connection Manager for OLEDB Source
              ImportDB";


5. Add a Dataflow task to the package.
Now it's time to add the dataflow task to the package. This is done by using the Executable class and the executables class. By using the add method of the Executables class it's possible to add a executable (ForEachLoop, Forloop, Package, Sequence and TaskHost). Taskhost is the interesting class in this post. I haven't found an official list on MSDN (?) of the so called STOCK components (maybe here) but i've found a list on SQLIS.com for  2008 (bit strange that SQLIS talks about "DTSAdapter.OleDbSource.2" and my code "DTSAdapter.OleDbSource" (?)).

            //Adding the dataflow task to the package
        DTS.Executable exe = pkg.Executables.Add("STOCK:PipelineTask");
        DTS.TaskHost TKHSQLHost = (DTS.TaskHost)exe;
        TKHSQLHost.Name = "This is a programmed DataFlowTask";
        MainPipe dataFlowTask = (MainPipe)TKHSQLHost.InnerObject;


6. Create a source component to the dataflowtask.
The next thing is building a source component in the dataflowtask. IDTSComponentMetaData100 is an interface and i haven't found any (interesting) information about this on MSDN and google. The CManagedComponentWrapper Interface configures the properties and column collections of a component. The ProvideComponentProperties() is called when a component is first added to the dataflow task to initialize the ComponentMetaData of the component.
 
            // Create the source component.
        IDTSComponentMetaData100 source =  
                        dataFlowTask.ComponentMetaDataCollection.New();
        source.ComponentClassID = "DTSAdapter.OleDbSource";
        CManagedComponentWrapper srcDesignTime = source.Instantiate();
        srcDesignTime.ProvideComponentProperties();


7. Connect the Source connection with the source component.
The next step is connecting the source connection with the  source component. The RuntimeConnectionCollection contains the objects defined by the component. I haven't founf very little information about the GetExtendedInterface. With the SetComponentProperty it's possible to set properties to a IDTSCustomProperty100. AcquireConnections is called during both component design and execution. Components that connect to external data sources should establish their connections during this method. ReinitializeMetaData repairs any errors identified during validation that cause the component to return VS_NEEDSNEWMETADATA at design time. I'm not sure but sometimes when i edit SSIS packages a red error appears and when i open and close the component the error disappears. Perhaps the ReinitializeMetaData is called? The ReleaseConnections method
frees the connections established during AcquireConnections. This can be called at design time and run time.

            // Assign the connection manager.
       if (source.RuntimeConnectionCollection.Count > 0)
       {
          source.RuntimeConnectionCollection[0].ConnectionManager =
                             DTS.DtsConvert.GetExtendedInterface(ConnMgrAdvent);
          source.RuntimeConnectionCollection[0].ConnectionManagerID =
                             pkg.Connections["AdventureWorks2008R2"].ID;
       }
       // Set the custom properties of the source.
       srcDesignTime.SetComponentProperty("AccessMode", 0);
       srcDesignTime.SetComponentProperty("OpenRowset", "[Production].[Product]");

       // Connect to the data source, and then update the metadata for the source.
       srcDesignTime.AcquireConnections(null);
       srcDesignTime.ReinitializeMetaData();
       srcDesignTime.ReleaseConnections();


8. Create a destination component to the dataflowtask.
See 6.

             // Create the destination component.
        IDTSComponentMetaData100 destination =
                   dataFlowTask.ComponentMetaDataCollection.New();
        destination.ComponentClassID = "DTSAdapter.OleDbDestination";
        CManagedComponentWrapper destDesignTime = destination.Instantiate();
        destDesignTime.ProvideComponentProperties();


9. Connect the destination connection with the destination component.
See 7.

          // Create the destination component.
       IDTSComponentMetaData100 destination =
                      dataFlowTask.ComponentMetaDataCollection.New();
       destination.ComponentClassID = "DTSAdapter.OleDbDestination";
       CManagedComponentWrapper destDesignTime = destination.Instantiate();
       destDesignTime.ProvideComponentProperties();

       // Set the custom properties of the destination
       destDesignTime.SetComponentProperty("AccessMode", 0);
       destDesignTime.SetComponentProperty("OpenRowset", "[dbo].[Product]");

       // Connect to the data source, and then update the metadata for the source.
       destDesignTime.AcquireConnections(null);
       destDesignTime.ReinitializeMetaData();
       destDesignTime.ReleaseConnections();



10. Create the path between the source and the desination component.
Okay, now we have created a source component and a destination component. Now it's time to connect the components with each other. First we need to define a path interface. Path objects are created to establish the flow of data between the IDTSOutput100 of an upstream component and the IDTSInput100 of another component. A path contains a single output object represented as the StartPoint, and a single input, which is the EndPoint of the path. A path between two components is established in a two-step process. First, create the path by calling New on the path collection of the IDTSComponentMetaData100 object. Second, establish the path by calling AttachPathAndPropagateNotifications on the path itself. This method establishes the path and notifies the affected components of its existence.

             // Create the path.
        IDTSPath100 path = dataFlowTask.PathCollection.New();
        path.AttachPathAndPropagateNotifications(source.OutputCollection[0],
                                destination.InputCollection[0]);


11. Map the fields from the source with the desination.
The next step is mapping the fields from the source component with destination component.
IDTSInput100 contains the collection of columns that represents the data provided to a component in the form of PipelineBuffer objects at run time. A connection between an IDTSInput100 and an IDTSOutput100 is established through the IDTSPath100 object. IDTSVirtualInput100
represents the columns available to a component from the upstream component. The virtual columns are selected for a component by calling the SetUsageType method of the CManagedComponentWrapperClass.

        // Get the destination's default input and virtual input.
         IDTSInput100 input = destination.InputCollection[0];
         int destinationInputID = input.ID;

         IDTSVirtualInput100 vInput = input.GetVirtualInput();

         // Iterate through the virtual input column collection.
         foreach (IDTSVirtualInputColumn100 vColumn in vInput.VirtualInputColumnCollection)
         {
            IDTSInputColumn100 vCol = destDesignTime.SetUsageType(input.ID, vInput,
                            vColumn.LineageID, DTSUsageType.UT_READWRITE);
            destDesignTime.MapInputColumn(input.ID, vCol.ID,
                            input.ExternalMetadataColumnCollection[vColumn.Name].ID);
         }



12. Write the package to a file ( you can also write this to SQL Server).
The next step is writing the columns to the output window (offcourse this optional) and writing the SSIS package to the file system with SaveToXml.

       // Verify that the columns have been added to the input. 
       foreach (IDTSInputColumn100 inputColumn in    
               destination.InputCollection[0].InputColumnCollection)
           Console.WriteLine(inputColumn.Name);
       Console.Read();

       app.SaveToXml(String.Format(@"E:\\SSISProgram\\{0}.dtsx", pkg.Name), pkg, null);

       Console.WriteLine("Package  {0} created", pkg.Name);


13. Dispose the objects.
Cleaning up the objects and finishing the code.

           pkg.Dispose();
       app = null;
       source = null;
       destination = null;
       srcDesignTime = null;
       destDesignTime = null;



Conclusion
Well, this is quite a job to get this working. There is very litle information and explanations available about programming SSIS packages and especially the dataflow task. 

Greetz,

Hennie


ps. the complete code:



// Create an application
DTS.Application app = new DTS.Application();

// Create a package
DTS.Package pkg = new DTS.Package();

//Setting some properties
pkg.Name = @"MyProgrammedDataflowTask";

//Adding a connection to the database AdventureWorksLT2008R2
DTS.ConnectionManager ConnMgrAdvent = pkg.Connections.Add("OLEDB");
ConnMgrAdvent.ConnectionString = @"Provider=SQLOLEDB.1;" +
    "Integrated Security=SSPI;Initial Catalog=AdventureWorks2008R2;" +
    "Data Source=(local);";
ConnMgrAdvent.Name = @"AdventureWorks2008R2";
ConnMgrAdvent.Description = @"SSIS Connection Manager for OLEDB Source AdventureWorks2008R2";

//Adding a connection to the database Import_DB
DTS.ConnectionManager ConnMgrImport_DB = pkg.Connections.Add("OLEDB");
ConnMgrImport_DB.ConnectionString = @"Provider=SQLOLEDB.1;" +
    "Integrated Security=SSPI;Initial Catalog=Import_DB;" +
    "Data Source=(local);";
ConnMgrImport_DB.Name = @"Import_DB";
ConnMgrImport_DB.Description = @"SSIS Connection Manager for OLEDB Source ImportDB";


//Adding the dataflow task to the package
DTS.Executable exe = pkg.Executables.Add("STOCK:PipelineTask");
DTS.TaskHost TKHSQLHost = (DTS.TaskHost)exe;
TKHSQLHost.Name = "This is a programmed DataFlowTask";
MainPipe dataFlowTask = (MainPipe)TKHSQLHost.InnerObject;

// Create the source component.
IDTSComponentMetaData100 source =
    dataFlowTask.ComponentMetaDataCollection.New();
source.ComponentClassID = "DTSAdapter.OleDbSource";
CManagedComponentWrapper srcDesignTime = source.Instantiate();
srcDesignTime.ProvideComponentProperties();

// Assign the connection manager.
if (source.RuntimeConnectionCollection.Count > 0)
{

    source.RuntimeConnectionCollection[0].ConnectionManager = DTS.DtsConvert.GetExtendedInterface(ConnMgrAdvent);
    source.RuntimeConnectionCollection[0].ConnectionManagerID = pkg.Connections["AdventureWorks2008R2"].ID;

}

// Set the custom properties of the source.
//srcDesignTime.SetComponentProperty("AccessMode", 2);
//srcDesignTime.SetComponentProperty("SqlCommand", "Production.Product");
srcDesignTime.SetComponentProperty("AccessMode", 0);
srcDesignTime.SetComponentProperty("OpenRowset", "[Production].[Product]");

// Connect to the data source, and then update the metadata for the source.
srcDesignTime.AcquireConnections(null);
srcDesignTime.ReinitializeMetaData();
srcDesignTime.ReleaseConnections();

// Create the destination component.
IDTSComponentMetaData100 destination = dataFlowTask.ComponentMetaDataCollection.New();
destination.ComponentClassID = "DTSAdapter.OleDbDestination";
CManagedComponentWrapper destDesignTime = destination.Instantiate();
destDesignTime.ProvideComponentProperties();

// Assign the connection manager.
destination.RuntimeConnectionCollection[0].ConnectionManager = DTS.DtsConvert.GetExtendedInterface(ConnMgrImport_DB);

if (destination.RuntimeConnectionCollection.Count > 0)
{

    destination.RuntimeConnectionCollection[0].ConnectionManager = DTS.DtsConvert.GetExtendedInterface(ConnMgrImport_DB);
    destination.RuntimeConnectionCollection[0].ConnectionManagerID = pkg.Connections["Import_DB"].ID;

}

// Set the custom properties of the destination
destDesignTime.SetComponentProperty("AccessMode", 0);
destDesignTime.SetComponentProperty("OpenRowset", "[dbo].[Product]");

// Connect to the data source, and then update the metadata for the source.
destDesignTime.AcquireConnections(null);
destDesignTime.ReinitializeMetaData();
destDesignTime.ReleaseConnections();


// Create the path.
IDTSPath100 path = dataFlowTask.PathCollection.New();
path.AttachPathAndPropagateNotifications(source.OutputCollection[0], destination.InputCollection[0]);

          

// Get the destination's default input and virtual input.
IDTSInput100 input = destination.InputCollection[0];
int destinationInputID = input.ID;

IDTSVirtualInput100 vInput = input.GetVirtualInput();

// Iterate through the virtual input column collection.
foreach (IDTSVirtualInputColumn100 vColumn in vInput.VirtualInputColumnCollection)
{
    IDTSInputColumn100 vCol = destDesignTime.SetUsageType(input.ID, vInput, vColumn.LineageID, DTSUsageType.UT_READWRITE);
    destDesignTime.MapInputColumn(input.ID, vCol.ID, input.ExternalMetadataColumnCollection[vColumn.Name].ID);
}

// Verify that the columns have been added to the input.
foreach (IDTSInputColumn100 inputColumn in destination.InputCollection[0].InputColumnCollection)
    Console.WriteLine(inputColumn.Name);
Console.Read();

app.SaveToXml(String.Format(@"E:\\SSISProgram\\{0}.dtsx", pkg.Name), pkg, null);

Console.WriteLine("Package  {0} created", pkg.Name);

pkg.Dispose();

app = null;
source = null;
destination = null;
srcDesignTime = null;
destDesignTime = null;

Geen opmerkingen:

Een reactie posten