在一些场景下,我们需要大批量插入或者更新数据,我们知道对数据库的操作开销很大,比如要先建立连接、传输数据、获取返回结果、关闭连接等。在API设计的时候我们也应该提供批量数据的处理操作,而不是让用户循环调用单个数据API,以减少不必要的开销。

    在SQLServer批量插入操作中,有两种方法一种是表值参数存储过程,一种是利用SqlBulkCopy,这两种方法的共同点是,他们的参数都是DataTable,这里以我在最近编写的一个数据库表同步的例子来说明这两种方法。

TVP方法


    TVP(全称 :Table-Valued Parameter)叫做表值参数(Table-Valued Parameter),它是SQLServer 2008的一个新特性(还有一些SQLServer的特性比如通用表表达式(Common Table Express, CTE)在某些时候比如树状类型的递归时也比较有用)。顾名思义,表值参数表示你可以把一个表类型作为参数传递到函数或存储过程里。为了举例,这里先创建一个Chart表,用来存储股票的1分钟K线数据:

CREATE TABLE [dbo].[Chart](
	[ID] [int] IDENTITY(1,1) NOT NULL,
	[StockCode] [nvarchar](50) NOT NULL,
	[Open] [float] NOT NULL,
	[High] [float] NOT NULL,
	[Low] [float] NOT NULL,
	[Close] [float] NOT NULL,
	[Volume] [int] NOT NULL,
	[Time] [datetime] NOT NULL
) ON [PRIMARY]

     紧接着,创建一个Table类型:

CREATE Type ChartTableType as Table(
	        [StockCode] [nvarchar](50) NOT NULL,
	        [Open] [float] NOT NULL,
	        [High] [float] NOT NULL,
	        [Low] [float] NOT NULL,
	        [Close] [float] NOT NULL,
	        [Volume] [int] NOT NULL,
	        [Time] [datetime] NOT NULL)

     可以看到,这里面除了自增的ID没有创建之外,其他的跟Chart表的字段定义完全一致,我们可以在Chart表的创建脚本上做简单的修改。

     紧接着创建存储过程:

CREATE PROCEDURE dbo.usp_TVPInsertChart
  @TVP ChartTableType READONLY
  AS
  SET NOCOUNT ON
  INSERT INTO Chart([StockCode]
       ,[Open]
       ,[High]
       ,[Low]
       ,[Close]
       ,[Volume]
       ,[Time])
  SELECT [StockCode]
       ,[Open]
       ,[High]
       ,[Low]
       ,[Close]
       ,[Volume]
       ,[Time]
  FROM @TVP;

    存储过程中,使用的上面创建的表值参数,并将其定义为了readonly,该存储过程就是把该表值参数里面的数据,插入到Chart表里。操作完成之后,我们可以在SQLServer可编程看到。

    接下来,在C#里面可以直接编写代码操作了。

DataTable dt = GetFillChartTypeDateTable(charts);
using (IDbConnection connection = new SqlConnection(connStr))
{
    connection.Execute(procedureName, new { 
        TVP = dt.AsTableValuedParameter("dbo.ChartTableType") }, 
        commandType: CommandType.StoredProcedure);
}

    首先,需要构建一个DataTable,类型和名称需要跟SQLServer里的Chart表一致,然后将待插入的数据放到DataTable里,方法如下:

private DataTable GetFillChartTypeDateTable(List<SyncData> insert)
{
    DataTable chartTypeTable = new DataTable();
    chartTypeTable.Columns.Add("StockCode", typeof(string));
    chartTypeTable.Columns.Add("Open", typeof(double));
    chartTypeTable.Columns.Add("High", typeof(double));
    chartTypeTable.Columns.Add("Low", typeof(double));
    chartTypeTable.Columns.Add("Close", typeof(double));
    chartTypeTable.Columns.Add("Volume", typeof(int));
    chartTypeTable.Columns.Add("Time", typeof(DateTime));
    foreach (SyncData chart in insert)
    {
        chartTypeTable.Rows.Add(chart.StockCode, chart.Open, chart.High, chart.Low, chart.Close, chart.Volume, chart.Time);
    }
    return chartTypeTable;
}

    当然这个DataTable表也可以使用反射来进行动态创建,比如可以根据传进来的SyncData,反射其字段和类型,动态创建一个DataTable,但是要注意,SyncData的字段必须要跟数据库里定义的ChartTableType一致,不一致会导致抛出异常。自定义DataTable灵活性更高些,但是繁琐。在后面使用SQLBulkCopy中我会演示根据实体动态构建DataTable。

    紧接着,调用存储过程,将Table作为参数传递进去,我这里使用的是Dapper,很简单,直接执行Execute方法,传入存储过程名称,然后新建一个匿名内,属性为TVP,他就是我们在存储过程里面定义的参数名称,然后指定表值类型为ChartTableType。

    如果不使用Dapper,需要新定义一下SQLServerParameter:

SqlParameter[] parameters = new SqlParameter[1];
parameters[0] = new SqlParameter();
parameters[0].TypeName = "dbo.ChartTableType";
parameters[0].Value = dt;
parameters[0].ParameterName = "@TVP";
sqlServerHelper.ExecuteNonQuery(targetDbConnection, CommandType.StoredProcedure, procedureName, parameters);

     整个完整的过程,可以先创建表值类型、存储过程,然后调用程序过程,最后删除存储过程、表值类型(不能直接修改被其他存储过程引用的表值类型,一定要先删除所有的引用了表值类型的村存储过程最后再删除表值类型,否则会报错)。整个代码如下,这里使用了Dapper这个类库:

public string CreateTVPProcedure(string connStr)
{
    List<string> sqlText = new List<string>();
    string procedureName = "";
    procedureName = "usp_TVPInsertChart";
    sqlText.Add(@"CREATE Type ChartTableType as Table(
	                        [StockCode] [nvarchar](50) NOT NULL,
	                        [Open] [float] NOT NULL,
	                        [High] [float] NOT NULL,
	                        [Low] [float] NOT NULL,
	                        [Close] [float] NOT NULL,
	                        [Volume] [int] NOT NULL,
	                        [Time] [datetime] NOT NULL
                        )");

    sqlText.Add(@" CREATE PROCEDURE dbo.usp_TVPInsertChart
                           @TVP ChartTableType READONLY
                              AS
                              SET NOCOUNT ON
                              INSERT INTO Chart([StockCode]
                                   ,[Open]
                                   ,[High]
                                   ,[Low]
                                   ,[Close]
                                   ,[Volume]
                                   ,[Time])
                              SELECT [StockCode]
                                   ,[Open]
                                   ,[High]
                                   ,[Low]
                                   ,[Close]
                                   ,[Volume]
                                   ,[Time]
                              FROM @TVP;
                          ");

    using (IDbConnection connection = new SqlConnection(connStr))
    {
        foreach (var sql in sqlText)
        {
            connection.Execute(sql);
        }
    }
    return procedureName;
}

public void BatchInsert(string connStr,  string procedureName, List<SyncData> charts)
{
    DataTable dt = GetFillChartTypeDateTable(charts);
    using (IDbConnection connection = new SqlConnection(connStr))
    {
        connection.Execute(procedureName, new { TVP = dt.AsTableValuedParameter("dbo.ChartTableType") }, commandType: CommandType.StoredProcedure);
    }
}

public void DropTVPProcedure(string connStr)
{
    string sqlText = @"if Exists(select name from sysobjects where NAME = 'usp_TVPInsertChart' and type='P')
                                   drop procedure usp_TVPInsertChart 
                            IF EXISTS (SELECT 1 FROM sys.types t 
                                   join sys.schemas s on t.schema_id=s.schema_id 
                                   and t.name='ChartTableType' and s.name='dbo')
                            DROP TYPE dbo.ChartTableType";
   
    if (!string.IsNullOrEmpty(sqlText))
    {
        using (IDbConnection connection = new SqlConnection(connStr))
        {
            connection.Execute(sqlText);
        }
    }
}

    这是模板方法模式的一个很好应用,可以在基类中定义一个虚方法PreProcess,在派生类中重写该方法,调用CreateTVPProcedure方法,然后执行BatchInsert,最后执行PostProcess,在该方法中调用DropTVPProcedure方法。

SQLBulkCopy


    批量插入操作的另外一种方法是使用SqlBulkCopy,使用方法很简单,首先要定义一个SqlBulkCopy对象,然后定义DataTable和数据库表的映射关系,这里为了方便,两者保持了一致。

    在生成DataTable的时候,这里使用了反射,利用传进来的泛型类型T,动态构造了一个DataTable,并将待传输的数据填充到DataTable中,最后调用WirteToServer方法。如下:

public void InsertBatchTest<T>(string connStr, TableNames tableName, IEnumerable<T> entityList, IDbTransaction transaction = null) where T : class
{
    using (IDbConnection conn = new SqlConnection(connStr))
    {
        var tbName = string.Format("dbo.{0}", tableName.ToString());
        var trans = (SqlTransaction)transaction;
        using (var bulkCopy = new SqlBulkCopy(conn as SqlConnection, SqlBulkCopyOptions.TableLock, trans))
        {
            bulkCopy.BatchSize = entityList.Count();
            bulkCopy.BulkCopyTimeout = 60;
            bulkCopy.DestinationTableName = tbName;
            var table = new DataTable();
            PropertyDescriptorCollection props = TypeDescriptor.GetProperties(typeof(T));
            foreach (PropertyDescriptor propertyInfo in props)
            {
                bulkCopy.ColumnMappings.Add(propertyInfo.Name, propertyInfo.Name);
                table.Columns.Add(propertyInfo.Name, Nullable.GetUnderlyingType(propertyInfo.PropertyType) ?? propertyInfo.PropertyType);
            }
            var values = new object[props.Count()];
            foreach (T itemm in entityList)
            {
                for (var i = 0; i < values.Length; i++)
                {
                    values[i] = props[i].GetValue(itemm);
                }
                table.Rows.Add(values);
            }
            try
            {
                conn.Open();
                bulkCopy.WriteToServer(table);
            }
            catch (Exception ex)
            {
                conn.Close();
                conn.Dispose();
            }
        }
    }
}

     对比TVP方案,这里不需要预先创建DataTable,更简洁一点,性能的话,两者差别不大。

总结


     在有些场景下,我们需要批量插入数据,本文介绍了SQLServer中的两种比较高效的批量数据操作方法,它们是TVP表值类型存储过程以及SqlBulkCopy,他们的共同点是通过传输DataTable作为参数,批量插入数据,不同点在于TVP方法需要SQLServer 2008及以上版本,并且需要预先创建表值类型和存储过程,SqlBulkCopy方法则更简洁,不需要额外创建类型或者存储过程。