zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

SQLServer · 最佳实践 · 数据库实现大容量插入的几种方式

SQLServer数据库 实现 方式 实践 最佳 几种 插入
2023-09-14 09:02:09 时间

很多用户在使用阿里云云数据库SQL Server时,为了加快插入速度,都尝试使用大容量插入的方式,大家都知道,对于完整恢复模式下的数据库,大容量导入执行的所有行插入操作都会完整地记录在事务日志中。如果使用完整恢复模式,大型数据导入会导致填充事务日志的速度很快。相反,对于简单恢复模式或大容量日志恢复模式,大容量导入操作的按最小方式记录日志减少了大容量导入操作填满日志空间的可能性。另外,按最小方式记录日志的效率也比按完整方式记录日志高 。
但实际上,当大容量导入与数据库镜像共存时,会出现镜像 Suspend的情况,这个情况是由于微软在2008 R2上的BUG导致,详细你可以了解 https://support.microsoft.com/en-us/kb/2700641 ,微软已经明确表示在2008 R2不会FIXED,那么如何正确在RDS使用大容量导入并避免镜像异常,下面介绍几种方式.

通过ADO.NET SQLBulkCopy 方式
只需要将SqlBulkCopy 指定SqlBulkCopyOptions.CheckConstraints就好,即:SqlBulkCopy blkcpy = new SqlBulkCopy(desConnString, SqlBulkCopyOptions.CheckConstraints)

例如:将本地的一个大表通过SQLBulkCopy方式导入到RDS的实例中

static void Main()

 string srcConnString = "Data Source=(local);Integrated Security=true;

 Initial Catalog=testdb";

 string desConnString = "Data Source=****.sqlserver.rds.aliyuncs.com,3433;

 UserID=**;Password=**;Initial Catalog=testdb";

 SqlConnection srcConnection = new SqlConnection();

 SqlConnection desConnection = new SqlConnection();

 SqlCommand sqlcmd = new SqlCommand();

 SqlDataAdapter da = new SqlDataAdapter();

 DataTable dt = new DataTable();

 srcConnection.ConnectionString = srcConnString;

 desConnection.ConnectionString = desConnString;

 sqlcmd.Connection = srcConnection;

 sqlcmd.CommandText = @"

 SELECT top 1000000 [PersonType],[NameStyle],[Title],[FirstName],[MiddleName],

 [LastName] ,[Suffix],[EmailPromotion],[AdditionalContactInfo],[Demographics],NULL 

 as rowguid,[ModifiedDate] FROM [testdb].[dbo].[Person]";

 sqlcmd.CommandType = CommandType.Text;

 sqlcmd.Connection.Open();

 da.SelectCommand = sqlcmd;

 da.Fill(dt);


using (SqlBulkCopy blkcpy = new SqlBulkCopy(desConnString,SqlBulkCopyOptions.CheckConstraints)) // using (SqlBulkCopy blkcpy = // new SqlBulkCopy(desConnString, SqlBulkCopyOptions.Default)) blkcpy.BatchSize = 2000; blkcpy.BulkCopyTimeout = 5000; blkcpy.SqlRowsCopied += new SqlRowsCopiedEventHandler(OnSqlRowsCopied); blkcpy.NotifyAfter = 2000; foreach (DataColumn dc in dt.Columns) blkcpy.ColumnMappings.Add(dc.ColumnName, dc.ColumnName); blkcpy.DestinationTableName = "Person"; blkcpy.WriteToServer(dt); catch (Exception ex) Console.WriteLine(ex.Message); finally sqlcmd.Clone(); srcConnection.Close(); desConnection.Close(); private static void OnSqlRowsCopied( object sender, SqlRowsCopiedEventArgs e) Console.WriteLine("Copied {0} so far...", e.RowsCopied);
通过JDBC SQLServerBulkCopy 方式
同样的道理,需要在copyOptions指定检查约束性

SQLServerBulkCopyOptions copyOptions = new SQLServerBulkCopyOptions(); 

copyOptions.setCheckConstraints(true);

测试时,请用Microsoft JDBC Drivers 6.0 的sqljdbc41.jar,sqljdbc4.jar及更老版本没有SQLServerBulkCopy 实现。

例如: 将本地的一个大表通过SQLServerBulkCopy方式导入到RDS的实例中


import com.microsoft.sqlserver.jdbc.SQLServerBulkCopy; import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions; public class Program { public static void main(String[] args) String sourceConnectionString = "jdbc:sqlserver://localhost:1433;" + "databaseName=testdb;user=****;password=****"; String destConnectionString = "jdbc:sqlserver://*****.sqlserver.rds.aliyuncs.com:3433;" + "databaseName=testdb;user=****;password=**** "; try Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); try (Connection sourceConnection = DriverManager.getConnection(sourceConnectionString)) try (Statement stmt = sourceConnection.createStatement())
" SELECT top 1000000 " + "[PersonType],[NameStyle],[Title],[FirstName],[MiddleName],[LastName] ," + "[Suffix],[EmailPromotion],[AdditionalContactInfo]," + "[Demographics],NULL as rowguid,[ModifiedDate] " + "FROM [testdb].[dbo].[Person]")) try (Connection destinationConnection = DriverManager.getConnection(destConnectionString)) Statement stmt1 = destinationConnection.createStatement(); long countStart = 0; try (ResultSet rsRowCount = stmt1.executeQuery( "SELECT COUNT(*) FROM dbo.Person;")) rsRowCount.next(); countStart = rsRowCount.getInt(1); System.out.println("Starting row count = " + countStart); try (SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(destinationConnection)) SQLServerBulkCopyOptions copyOptions = new SQLServerBulkCopyOptions(); copyOptions.setKeepIdentity(true); copyOptions.setBatchSize(2000); copyOptions.setBulkCopyTimeout(5000); //this is importance setting copyOptions.setCheckConstraints(true); bulkCopy.setBulkCopyOptions(copyOptions); bulkCopy.setDestinationTableName("dbo.Person"); bulkCopy.addColumnMapping("PersonType", "PersonType"); bulkCopy.addColumnMapping("NameStyle", "NameStyle"); bulkCopy.addColumnMapping("Title", "Title"); bulkCopy.addColumnMapping("FirstName", "FirstName"); bulkCopy.addColumnMapping("MiddleName", "MiddleName"); bulkCopy.addColumnMapping("LastName", "LastName"); bulkCopy.addColumnMapping("Suffix", "Suffix"); bulkCopy.addColumnMapping("EmailPromotion", "EmailPromotion"); bulkCopy.addColumnMapping("AdditionalContactInfo", "AdditionalContactInfo"); bulkCopy.addColumnMapping("Demographics", "Demographics"); bulkCopy.addColumnMapping("rowguid", "rowguid"); bulkCopy.addColumnMapping("ModifiedDate", "ModifiedDate"); try bulkCopy.writeToServer(rsSourceData); catch (Exception e) e.printStackTrace();
try (ResultSet rsRowCount = stmt1.executeQuery( "SELECT COUNT(*) FROM dbo.Person;")) rsRowCount.next(); long countEnd = rsRowCount.getInt(1); System.out.println("Ending row count = " + countEnd); System.out.println((countEnd - countStart) + " rows were added."); catch (Exception e) e.printStackTrace();

第一步:需要将数据BCP到本地

BCP testdb.dbo.person Out "bcp_data" /t /N /U **** /P *** /S "****.sqlserver.rds.aliyuncs.com,3433" 

第二步:将导出的文件直接导入到RDS的实例中,但需要指定提示:/h "CHECK_CONSTRAINTS"

BCP testdb.dbo.person In "bcp_data" /C /N /q /k /h "CHECK_CONSTRAINTS" /U *** /P *** /b 500 /S "***.sqlserver.rds.aliyuncs.com,3433" 

通过DTS/SSIS方式

第一种:import/export data方式需要先保存SSIS包,然后修改Connection Manager的属性 ,如下图
1111111

第二种:直接使用SQL Server Business Intelligence Development Stuidio新建 SSIS包:

222222

不能在RDS通过下列两种方式进行大容量插入 :原因是基于安全考虑不提供上传文件到RDS 数据库服务器。

第一种:

BULK INSERT testdb.dbo.person_in

FROM ND:\trace\bcp.txt

 CHECK_CONSTRAINTS 

); 

第二种:

INSERT ... SELECT * FROM OPENROWSET(BULK...)

大容量导入数据会带来更快的插入,解决了用户在有大量数据导入缓慢困惑,在阿里云数据库中,你可以使用五种方式来实现业务场景,但是基于镜像的主备关系,需要特别加入一个检查约束的选项,这是写这个最佳实践的目的,一旦镜像SUSPEND,不断有DUMP文件产生,一来需要时间来修正,二来DUMP文件也会不断占用空间,但不会影响用户的可用性和可靠性。有两种方式在RDS中不能实现,另外,还可以通过ODBC来实现大容量导入,具体请参见https://msdn.microsoft.com/en-us/library/ms403302.aspx。希望这些对大家有用,特别是阿里云云数据库使用用户。


SQLServer数据库文件相关知识笔记 数据库文件是SQLServer数据库的物理体现,和计算机的普通文件一样存储在计算机的磁盘空间当中。作为数据库记录和日志等其他信息的存储载体。
数据库:SQLServer 实现行转列、列转行用法笔记 官方解释:可以使用 PIVOT 和 UNPIVOT 关系运算符将表值表达式更改为另一个表。PIVOT 通过将表达式某一列中的唯一值转换为输出中的多个列来旋转表值表达式,并在必要时对最终输出中所需的任何其余列值执行聚合。UNPIVOT 与 PIVOT 执行相反的操作,将表值表达式的列转换为列值。
石沫 长期在电子商务行业从事SQL Server的设计,开发与维护,拥有10年的相关经验,擅长数据库的架构与设计,擅长数据库的性能优化,擅长数据库的自动化和智能化运维,从2014年开始, 在云计算领域坚持奋斗, 阿里云SQL Server系列产品的设计与规划者