SqlBulkCopy object collection to database table with C#

Performing SqlBulkCopy with collection of objects as a source data

Over the time various techniques to manipulate database data have evolved in .NET. Entity Framework is certainly the most advanced one, but for small ad hock applications where you have few inserts to deal with, ADO is still the fastest way to write data to the database. Especially with small projects which can be scheduled to run as Windows Scheduled Task or CRON job, it is an overkill to add Entity Framework.

On the other hand, Entity Framework is know as not the best performance player, although it gives you freedom with data manipulation and abstraction on top data with LINQ querying. Especially if you are working with Microsoft SQL Server, the most efficient way to write multiple records to database table is definitely SqlBulkCopy. Unfortunately Entity Framework does not support SqlBulkCopy out of the box but there are NuGet packages which provide extension methods to do this such asĀ EFCore.BulkExtensions package.

One way or another, using ADO for small data pump projects is the most lightweight solution you could go with when you are building these kind of micro projects and with it and obvious approach is to use SqlBulkCopy fo inserting large number of data.

However, performance dos not come without cost and that in case of SqlBulkCopy are definitely the following points

  • Column order - order of columns you are inserting with SqlBulkCopy needs to exactly match the order of columns in database table
  • Column names - column names of DataTable C# object need to exactly match the names of database table column names

Because of these two points, working with bulk copy is definitely no ones favorite way to write data to database, but it still stays the most efficient one to write multiple records at the same time.

Note

In order to use SqlBulkCopy with SQL Server in .NET Core you need to add dependency to System.Data.SqlClient NuGet package unlike .NET Framework where you have it already bundled with the framework itself

To overcome this, I wrote a small class that makes using SqlBulkCopy a bit easier to use. You still need to take case of the names and order but in a more elegant way, by taking care of the property names and order of a model as a strong type, instead of doing it on the parameters or DataTable columns level. There is no better way to work with properties of an object and make the code generic and reusable than doing so with reflection.

Mapping model to DataTable using reflection

So in simple words, what I basically did is transform POCO class to DataTable instance which you can use to perform SqlBulkCopy.

    public class ModelBulkInsert<T> where T : class
    {

        String connectionString;
        public ModelBulkInsert(String connectionString)
        {
            this.connectionString = connectionString;
        }

        public async Task BulkInsert(IEnumerable<T> items, String tableName)
        {
            using (var connection = new SqlConnection(connectionString))
            {

                using (var dataTable = new DataTable())
                {
                    var columns = typeof(T).GetProperties().Select(p => new DataColumn(p.Name, Nullable.GetUnderlyingType(p.PropertyType) ?? p.PropertyType)).ToArray();
                    dataTable.Columns.AddRange(columns);
                    dataTable.AcceptChanges();

                    await connection.OpenAsync();
                    var bulkCopy = new SqlBulkCopy(connection);
                    bulkCopy.DestinationTableName = tableName;
                    var rows = items.Select(r =>
                    {
                        var row = dataTable.NewRow();
                        Array.ForEach(r.GetType().GetProperties().ToArray(), (p) =>
                        {
                            row[p.Name] = p.GetValue(r) ?? DBNull.Value;
                        });
                        return row;
                    });
                    foreach (var row in rows)
                    {
                        dataTable.Rows.Add(row);
                    }
                    dataTable.AcceptChanges();
                    if (dataTable.Rows.Count > 0)
                    {
                        await bulkCopy.WriteToServerAsync(dataTable);
                    }
                    connection.Close();
                }
            }
        }

    }
    

Now you probably gained some time for importing data to database table process, but using reflection is still quite expensive operation and there is a lot of time spend when performing reflection tasks in your code. Since we are performing it on every object instance, this can significantly hurt our performance especially if we have large number of records we want to store to database table.

In order to overcome this problem related to performance we need to minimize usage of reflection in the code to avoid time spent on the reflection operations.

Reflection performance fix

There are sever places where we can limit reflection usage to only initial calls and later on store those refection results for re-using. Our ModelBulkInsert<T> class is tightly bond to model type T, so each class instance is aware of the type it takes care of. Because of this we can cache reflection results per model type.

Here are some reflection operation we can lower usage only on the first call

  • Create empty table with columns only for the first element and later on just clone the instance table
  • Cache PropertyInfo instances of the type in a collection
  • Create delegates for the property get method and re-use the delegate rather than running reflection for every property

Since there are couple of stuff we are going to introduce code for, it's a good idea to take out the reflection aggregated objects out of the ModelBulkInsert class to a separate class and do this work on the constructor of the class

    public class ReflectionCache<T>
    {
        public Type ModelType { get; private set; }
        public IDictionary<PropertyInfo,Func<T,object>> ModelTypeProperties { get; private set; }
        public DataTable ModelTypeTable { get; private set; }
        public ReflectionCache()
        {
            //Cache type
            this.ModelType = typeof(T);

            //Cache PropertyInfo into collection coupled with 
            this.ModelTypeProperties = ModelType.GetProperties().ToDictionary(k=>k, v=>
            {
                var expressionParam = Expression.Parameter(ModelType);
                var getterDelagateExpression = Expression.Lambda<Func<T, object>>(
                    Expression.Convert(
                        Expression.Property(expressionParam, v.Name),
                        typeof(object)
                    ),
                    expressionParam
                ).Compile();
                return getterDelagateExpression;
            });

            //Create structured empty table for the type
            ModelTypeTable = new DataTable();
            var columns = this.ModelTypeProperties.Keys.Select(p => new DataColumn(p.Name, Nullable.GetUnderlyingType(p.PropertyType) ?? p.PropertyType)).ToArray();
            ModelTypeTable.Columns.AddRange(columns);
            ModelTypeTable.AcceptChanges();
        }

    }
    

One of the reason this logic is taken out of the ModelBulkInsert class is re-usability. Once you create instance of ReflectionCache for the specific type, you can re-use it later on in your code when ever you need to deal with reflection related to that type.

Now that we have taken reflection out from each and every object processing we can change BulkInsert method yo use our ReflectionCache class instead of reflection directly.


    public class ModelBulkInsert<T> where T : class
    {
        ReflectionCache<T> reflectionCache;
        String connectionString;
        public ModelBulkInsert(String connectionString)
        {
            this.connectionString = connectionString;
        }

        public async Task BulkInsert(IEnumerable<T> items, String tableName)
        {
            using (var connection = new SqlConnection(connectionString))
            {
                Type modelType = typeof(T);
                DataTable dataTable;

                if (reflectionCache == null)
                {
                    reflectionCache = new ReflectionCache<T>();
                }

                dataTable = reflectionCache.ModelTypeTable.Clone();
                using (dataTable)
                {
                    await connection.OpenAsync();
                    var bulkCopy = new SqlBulkCopy(connection);
                    bulkCopy.DestinationTableName = tableName;
                    var rows = items.Select(r =>
                    {
                        var row = dataTable.NewRow();
                        Array.ForEach(reflectionCache.ModelTypeProperties.Keys.ToArray(), (p) =>
                        {
                            var getter = reflectionCache.ModelTypeProperties[p];
                            row[p.Name] = getter(r) ?? DBNull.Value;
                        });
                        return row;
                    });
                    foreach (var row in rows)
                    {
                        dataTable.Rows.Add(row);
                    }
                    dataTable.AcceptChanges();
                    if (dataTable.Rows.Count > 0)
                    {
                        await bulkCopy.WriteToServerAsync(dataTable);
                    }
                    connection.Close();
                }
            }
        }
    }

    

The constructor will take care of the heavy listing and accessing reflection so that we do not use it for each and every model instance from the collection. This way we are not bound to only one type of the model to be handled by the same piece of code and at the same time we do not have any performance drawbacks apart from the one in class constructor.

Demo

To test this code I created a simple table to hold employees object instances

CREATE TABLE Employees(
	Id BIGINT PRIMARY KEY IDENTITY,
	FirstName VARCHAR(50) NOT NULL,
	LatName VARCHAR(50) NOT NULL,
	BirthDate DATE,
	Email VARCHAR(150)
)
    

According to the table structure, our model POCO class will look as the following. Although we are not inserting he value or Id column, we still have to keep it to maintain the same structure for the SqlBulkCopy operation.

    public class Employee
    {
        public long Id { get; set; }
        public String FirstName { get; set; }
        public String LastName { get; set; }
        public DateTime BirthDate { get; set; }
        public String Email { get; set; }
    }
    

Using the ModelBulkCopy<T> is same as with reflection. Since we are holding the reflection caching internally, no need to change anything in the class method invocation.

    class Program
    {
        
        static async Task Main(string[] args)
        {
            ModelBulkInsert<Employee> modelBulkInsert = new ModelBulkInsert<Employee>("Data Source=.\\SQLEXPRESS;Initial Catalog=ProductsCatalog;Integrated Security=SSPI;");

            //BATCH ONE

            List<Employee> employeesBatch1 = new List<Employee>()
            {
                new Employee(){FirstName="Nancy",LastName="Davolio",BirthDate=DateTime.Parse("1948-12-08"),Email="nancy.davolio@test.com"},
                new Employee(){FirstName="Andrew",LastName="Fuller",BirthDate=DateTime.Parse("1952-02-19"),Email="andrew.fuller@test.com"},
                new Employee(){FirstName="Janet",LastName="Leverling",BirthDate=DateTime.Parse("1963-08-30"),Email="janet.leverling@test.com"},
                new Employee(){FirstName="Margaret",LastName="Peacock",BirthDate=DateTime.Parse("1937-09-19"),Email="margaret.peacock@test.com"},
                new Employee(){FirstName="Steven",LastName="Buchanan",BirthDate=DateTime.Parse("1955-03-04"),Email="steven.buchanan@test.com"}              
            };
            await modelBulkInsert.BulkInsert(employeesBatch1, "Employees");

            //BATCH TWO
            List<Employee> employeesBatch2 = new List<Employee>()
            {
                new Employee(){FirstName="Michael",LastName="Suyama",BirthDate=DateTime.Parse("1963-07-02"),Email="michael.suyama@test.com"},
                new Employee(){FirstName="Robert",LastName="King",BirthDate=DateTime.Parse("1960-05-29"),Email="robert.king@test.com"},
                new Employee(){FirstName="Laura",LastName="Callahan",BirthDate=DateTime.Parse("1958-01-09"),Email="laura.callahan@test.com"},
                new Employee(){FirstName="Anne",LastName="Dodsworth",BirthDate=DateTime.Parse("1966-01-27"),Email="anne.dodsworth@test.com"}
            };
            await modelBulkInsert.BulkInsert(employeesBatch2, "Employees");

        }
    }
    

Once code is executed we'll have all the record in the table and each of them is inserted in a batch rather than one by one which significantly improved performances and resource consumption.

Sqlbulkcopy Core

References

Disclaimer

Purpose of the code contained in snippets or available for download in this article is solely for learning and demo purposes. Author will not be held responsible for any failure or damages caused due to any other usage.


About the author

DEJAN STOJANOVIC

Dejan is a passionate Software Architect/Developer. He is highly experienced in .NET programming platform including ASP.NET MVC and WebApi. He likes working on new technologies and exciting challenging projects

CONNECT WITH DEJAN  Loginlinkedin Logintwitter Logingoogleplus Logingoogleplus

JavaScript

read more

SQL/T-SQL

read more

Umbraco CMS

read more

PowerShell

read more

Comments for this article