forked from ThatRendle/Simple.Data
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSqlBulkInserter.cs
More file actions
122 lines (103 loc) · 4.25 KB
/
SqlBulkInserter.cs
File metadata and controls
122 lines (103 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Simple.Data.SqlServer
{
using System.ComponentModel.Composition;
using System.Data;
using System.Data.SqlClient;
using Ado;
[Export(typeof(IBulkInserter))]
public class SqlBulkInserter : IBulkInserter
{
public IEnumerable<IDictionary<string, object>> Insert(AdoAdapter adapter, string tableName, IEnumerable<IDictionary<string, object>> data, IDbTransaction transaction, Func<IDictionary<string, object>, Exception, bool> onError, bool resultRequired)
{
if (resultRequired)
{
return new BulkInserter().Insert(adapter, tableName, data, transaction, onError, resultRequired);
}
int count = 0;
DataTable dataTable = null;
SqlConnection connection;
SqlBulkCopy bulkCopy;
var sqlBulkCopyOptions = BuildBulkCopyOptions(adapter);
if (transaction != null)
{
connection = (SqlConnection) transaction.Connection;
bulkCopy = new SqlBulkCopy(connection, sqlBulkCopyOptions, (SqlTransaction)transaction);
}
else
{
connection = (SqlConnection) adapter.CreateConnection();
bulkCopy = new SqlBulkCopy(connection, sqlBulkCopyOptions, null);
}
bulkCopy.DestinationTableName = adapter.GetSchema().FindTable(tableName).QualifiedName;
using (connection.MaybeDisposable())
using (bulkCopy)
{
connection.OpenIfClosed();
var dataList = data.ToList();
foreach (var record in dataList)
{
if (count == 0)
{
dataTable = CreateDataTable(adapter, tableName, dataList.SelectMany(r => r.Keys).Distinct(), bulkCopy);
}
AddRow(dataTable, record);
if (++count%5000 == 0)
{
bulkCopy.WriteToServer(dataTable);
dataTable.Clear();
}
}
if (dataTable.Rows.Count > 0)
{
bulkCopy.WriteToServer(dataTable);
}
}
return null;
}
private SqlBulkCopyOptions BuildBulkCopyOptions(AdoAdapter adapter)
{
var options = SqlBulkCopyOptions.Default;
if (adapter.AdoOptions != null)
{
options |= (adapter.AdoOptions.FireTriggersOnBulkInserts
? SqlBulkCopyOptions.FireTriggers
: SqlBulkCopyOptions.Default);
}
return options;
}
private DataTable CreateDataTable(AdoAdapter adapter, string tableName, IEnumerable<string> keys, SqlBulkCopy bulkCopy)
{
var table = adapter.GetSchema().FindTable(tableName);
var dataTable = new DataTable(table.ActualName);
foreach (var key in keys)
{
if (table.HasColumn(key))
{
var column = (SqlColumn)table.FindColumn(key);
dataTable.Columns.Add(column.ActualName, column.SqlDbType.ToClrType());
bulkCopy.ColumnMappings.Add(column.ActualName, column.ActualName);
}
else
{
// For non-matching columns, add a dummy DataColumn to make inserting rows easier.
dataTable.Columns.Add(Guid.NewGuid().ToString("N"));
}
}
return dataTable;
}
private void AddRow(DataTable dataTable, IDictionary<string, object> record)
{
var dataRow = dataTable.NewRow();
foreach (DataColumn column in dataTable.Columns)
{
if (record.ContainsKey(column.ColumnName))
dataRow[column] = record[column.ColumnName];
}
dataTable.Rows.Add(dataRow);
}
}
}