Skip to content

Commit acdf092

Browse files
committed
SQL to SQL data transform, SMO capability, Server transfer
1 parent cdcae01 commit acdf092

34 files changed

+631
-126
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
### SQL to MongoDb Transform Tool
1+
### SQL to SQL or SQL to MongoDb Transform Tool
22

3-
NetCoreStack DataTransform is a tool for application developers, data analysts and database administrators to transform and collect various data type from different SQL providers to MongoDb database.
3+
NetCoreStack DataTransform is a tool for application developers, data analysts and database administrators to transform and collect various data type from different SQL providers to SQL or MongoDb database.
44

55
![](https://github.com/NetCoreStack/DataTransform/blob/master/DataTransform.png)
66

@@ -26,4 +26,4 @@ Use Docker to get SQL Server and MongoDb running instances.
2626

2727
docker run -p 1401:1433 --name sql1 -d localsql
2828

29-
when above commands are completed you can run **afterdockerbuild** file to retore the sample MusicStore database for running SQL Server 2017 instance.
29+
once above commands are completed you can run **afterdockerbuild** file to retore the sample MusicStore database for running SQL Server 2017 instance.

src/DataTransform.Api.Hosting/Controllers/TransformController.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Collections.Generic;
88
using System.IO;
99
using System.Linq;
10+
using System.Threading;
1011
using System.Threading.Tasks;
1112

1213
namespace DataTransform.Api.Hosting.Controllers
@@ -141,7 +142,7 @@ public async Task<IActionResult> StartTransformAsync([FromQuery] string[] files)
141142
var transformManager = HttpContext.RequestServices.GetRequiredService<TransformManager>();
142143

143144
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
144-
Task.Factory.StartNew(async () => await transformManager.TransformAsync(files), TaskCreationOptions.LongRunning);
145+
Task.Factory.StartNew(async () => await transformManager.TransformAsync(files, CancellationToken.None), TaskCreationOptions.LongRunning);
145146
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
146147

147148
await Task.CompletedTask;

src/DataTransform.Api.Hosting/Core/DefaultDataContextConfigurationAccessor.cs

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/DataTransform.Api.Hosting/Core/DbTransformTask.cs renamed to src/DataTransform.Api.Hosting/Core/MongoDbTransformTask.cs

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
using Dapper;
22
using MongoDB.Bson;
33
using MongoDB.Driver;
4-
using NetCoreStack.Data;
5-
using NetCoreStack.Data.Context;
6-
using NetCoreStack.Data.Interfaces;
74
using NetCoreStack.WebSockets;
85
using System;
96
using System.Collections.Generic;
@@ -14,35 +11,29 @@
1411

1512
namespace DataTransform.Api.Hosting
1613
{
17-
public class DbTransformTask : ITransformTask
14+
public class MongoDbTransformTask : ITransformTask
1815
{
1916
private readonly SqlDatabase _sourceSqlDatabase;
20-
private readonly IMongoDbDataContext _mongoDbDataContext;
17+
private readonly MongoDbContext _mongoDbDataContext;
2118
private readonly TransformOptions _options;
22-
private readonly ICollectionNameSelector _collectionNameSelector;
2319
private readonly IConnectionManager _connectionManager;
24-
private readonly CancellationTokenSource _cancellationToken;
20+
private readonly CancellationToken _cancellationToken;
2521

26-
public List<DbTransformContext> DbTransformContexts { get; }
22+
public List<MongoDbTransformContext> DbTransformContexts { get; }
2723

28-
public DbTransformTask(TransformOptions options,
29-
ICollectionNameSelector collectionNameSelector,
30-
IConnectionManager connectionManager,
31-
CancellationTokenSource cancellationToken)
24+
public MongoDbTransformTask(TransformOptions options, IConnectionManager connectionManager, CancellationToken cancellationToken)
3225
{
3326
_options = options ?? throw new ArgumentNullException(nameof(options));
34-
_collectionNameSelector = collectionNameSelector ?? throw new ArgumentNullException(nameof(collectionNameSelector));
3527
_connectionManager = connectionManager ?? throw new ArgumentNullException(nameof(connectionManager));
36-
_cancellationToken = cancellationToken ?? throw new ArgumentNullException(nameof(cancellationToken));
28+
_cancellationToken = cancellationToken;
3729

38-
var dataContextConfigurationAccessor = new DefaultDataContextConfigurationAccessor(options);
39-
_sourceSqlDatabase = new SqlDatabase(dataContextConfigurationAccessor);
40-
_mongoDbDataContext = new MongoDbContext(dataContextConfigurationAccessor, _collectionNameSelector, null);
30+
_sourceSqlDatabase = new SqlDatabase(options.SourceConnectionString);
31+
_mongoDbDataContext = new MongoDbContext(options.TargetConnectionString);
4132

42-
DbTransformContexts = options.CreateTransformContexts(_cancellationToken);
33+
DbTransformContexts = options.CreateMongoDbTransformContexts(_cancellationToken);
4334
}
4435

45-
private long GetCount(DbTransformContext context)
36+
private long GetCount(MongoDbTransformContext context)
4637
{
4738
List<dynamic> sqlItems = new List<dynamic>();
4839
using (var connection = _sourceSqlDatabase.CreateConnection())
@@ -51,7 +42,7 @@ private long GetCount(DbTransformContext context)
5142
}
5243
}
5344

54-
private async Task<int> TokenizeLoopAsync(DbTransformContext context)
45+
private async Task<int> TokenizeLoopAsync(MongoDbTransformContext context)
5546
{
5647
int take = context.BundleSize;
5748
object indexId = context.LastIndexId;
@@ -62,7 +53,7 @@ private async Task<int> TokenizeLoopAsync(DbTransformContext context)
6253

6354
do
6455
{
65-
if (context.CancellationTokenSource.IsCancellationRequested)
56+
if (context.CancellationToken.IsCancellationRequested)
6657
{
6758
break;
6859
}
@@ -91,7 +82,7 @@ private async Task<int> TokenizeLoopAsync(DbTransformContext context)
9182
IsOrdered = false
9283
});
9384

94-
await _connectionManager.WsLogAsync($"SQL Table: {context.TableName} total: {totalIndices} record(s) progressed.");
85+
await _connectionManager.WsLogAsync($"Table: {context.TableName} total: {totalIndices} record(s) progressed.");
9586
}
9687

9788
if (totalIndices == 0)
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
using Dapper;
2+
using Microsoft.SqlServer.Management.Common;
3+
using Microsoft.SqlServer.Management.Smo;
4+
using MongoDB.Driver;
5+
using NetCoreStack.WebSockets;
6+
using System;
7+
using System.Collections.Generic;
8+
using System.Data;
9+
using System.Data.SqlClient;
10+
using System.Diagnostics;
11+
using System.Linq;
12+
using System.Threading;
13+
using System.Threading.Tasks;
14+
15+
namespace DataTransform.Api.Hosting
16+
{
17+
public class SqlTransformTask : ITransformTask
18+
{
19+
private readonly SqlDatabase _sourceSqlDatabase;
20+
private readonly SqlDatabase _targetSqlDatabase;
21+
private readonly TransformOptions _options;
22+
private readonly IConnectionManager _connectionManager;
23+
private readonly CancellationToken _cancellationToken;
24+
private readonly SqlConnectionStringBuilder _targetSqlConnectionBuilder;
25+
public List<SqlTransformContext> DbTransformContexts { get; }
26+
27+
public SqlTransformTask(TransformOptions options, IConnectionManager connectionManager, CancellationToken cancellationToken)
28+
{
29+
_options = options ?? throw new ArgumentNullException(nameof(options));
30+
_connectionManager = connectionManager ?? throw new ArgumentNullException(nameof(connectionManager));
31+
_cancellationToken = cancellationToken;
32+
33+
_sourceSqlDatabase = new SqlDatabase(options.SourceConnectionString);
34+
_targetSqlDatabase = new SqlDatabase(options.TargetConnectionString);
35+
36+
_targetSqlConnectionBuilder = new SqlConnectionStringBuilder(options.TargetConnectionString);
37+
38+
DbTransformContexts = options.CreateSqlTransformContexts(_cancellationToken);
39+
}
40+
41+
private long GetCount(SqlTransformContext context)
42+
{
43+
List<dynamic> sqlItems = new List<dynamic>();
44+
using (var connection = _sourceSqlDatabase.CreateConnection())
45+
{
46+
return connection.ExecuteScalar<long>(context.CountSql);
47+
}
48+
}
49+
50+
private async Task<int> TokenizeLoopAsync(SqlTransformContext context)
51+
{
52+
int take = context.BundleSize;
53+
object indexId = context.LastIndexId;
54+
int totalIndices = 0;
55+
var identityColumnName = context.IdentityColumnName;
56+
List<string> fields = context.FieldPattern == "*" ? new List<string>(new[] { "*" }) : context.FieldPattern.Split(',').Select(f => f.Trim()).ToList();
57+
bool allFields = fields.Contains("*");
58+
59+
DataTable dataTable = new DataTable();
60+
using (var connection = _sourceSqlDatabase.CreateConnection())
61+
{
62+
string targetDatabaseName = _targetSqlConnectionBuilder.InitialCatalog;
63+
Server server = new Server(new ServerConnection(connection));
64+
var database = server.Databases[targetDatabaseName];
65+
var tableName = context.TableName;
66+
if (tableName.Contains("."))
67+
{
68+
tableName = tableName.Split('.').Last();
69+
}
70+
71+
var table = database.Tables[tableName];
72+
for (int i = 0; i < table.Columns.Count; i++)
73+
{
74+
var column = table.Columns[i];
75+
var name = column.Name;
76+
if (allFields)
77+
{
78+
var dataColumn = new DataColumn(column.Name, column.DataType.SqlDataType.GetClrType());
79+
dataTable.Columns.Add(dataColumn);
80+
}
81+
else
82+
{
83+
if(fields.Contains(name))
84+
{
85+
var dataColumn = new DataColumn(column.Name, column.DataType.SqlDataType.GetClrType());
86+
dataTable.Columns.Add(dataColumn);
87+
}
88+
}
89+
}
90+
}
91+
92+
do
93+
{
94+
if (context.CancellationToken.IsCancellationRequested)
95+
{
96+
break;
97+
}
98+
99+
var predicateSql = $"SELECT TOP {take} {context.FieldPattern} FROM {context.TableName} " +
100+
$"WHERE {identityColumnName} > {indexId} ORDER BY {identityColumnName} ASC";
101+
102+
using (var connection = _sourceSqlDatabase.CreateConnection())
103+
{
104+
SqlCommand cmd = new SqlCommand(predicateSql, connection);
105+
SqlDataAdapter da = new SqlDataAdapter(cmd);
106+
da.Fill(dataTable);
107+
}
108+
109+
var itemsCount = dataTable.Rows.Count;
110+
totalIndices += itemsCount;
111+
if (itemsCount > 0)
112+
{
113+
DataRow lastRow = dataTable.Rows[itemsCount - 1];
114+
indexId = lastRow[identityColumnName];
115+
using (var connection = _targetSqlDatabase.CreateConnection())
116+
{
117+
var tableName = context.TableName;
118+
SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(connection.ConnectionString, SqlBulkCopyOptions.KeepIdentity);
119+
sqlBulkCopy.DestinationTableName = tableName;
120+
sqlBulkCopy.SqlRowsCopied += SqlBulkCopySqlRowsCopied;
121+
await sqlBulkCopy.WriteToServerAsync(dataTable);
122+
sqlBulkCopy.Close();
123+
dataTable.Clear();
124+
}
125+
126+
await _connectionManager.WsLogAsync($"Table: {context.TableName} total: {totalIndices} record(s) progressed.");
127+
}
128+
129+
if (totalIndices == 0)
130+
{
131+
break;
132+
}
133+
134+
} while (totalIndices < context.Count);
135+
136+
return totalIndices;
137+
}
138+
139+
private void SqlBulkCopySqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
140+
{
141+
return;
142+
}
143+
144+
private void EnsureDatabase()
145+
{
146+
SqlQueryHelper.EnsureTargetStates(_options);
147+
}
148+
149+
public async Task InvokeAsync()
150+
{
151+
EnsureDatabase();
152+
153+
Stopwatch sw = new Stopwatch();
154+
sw.Start();
155+
int totalRecords = 0;
156+
157+
foreach (var context in DbTransformContexts)
158+
{
159+
object lastIndexId = SqlQueryHelper.GetLatestId(_targetSqlDatabase, context);
160+
long count = GetCount(context);
161+
try
162+
{
163+
context.Count = count;
164+
context.LastIndexId = lastIndexId;
165+
totalRecords += await TokenizeLoopAsync(context);
166+
}
167+
catch (Exception ex)
168+
{
169+
await _connectionManager.WsErrorLog(ex);
170+
}
171+
finally
172+
{
173+
}
174+
}
175+
176+
sw.Stop();
177+
await _connectionManager.WsLogAsync(string.Format("Transformed total records: {0} time elapsed: {1}", totalRecords, sw.Elapsed));
178+
}
179+
}
180+
}

src/DataTransform.Api.Hosting/Core/TransformManager.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,18 @@ public class TransformManager
1212
private readonly IConnectionManager _connectionManager;
1313
private readonly TransformTaskFactory _transformTaskFactory;
1414

15-
public CancellationTokenSource CancellationToken { get; }
16-
1715
public TransformManager(TransformTaskFactory transformTaskFactory, IConnectionManager connectionManager)
1816
{
1917
_transformTaskFactory = transformTaskFactory ?? throw new ArgumentNullException(nameof(transformTaskFactory));
2018
_connectionManager = connectionManager ?? throw new ArgumentNullException(nameof(connectionManager));
21-
CancellationToken = new CancellationTokenSource();
2219
}
2320

24-
public async Task TransformAsync(string[] files)
25-
{
26-
21+
public async Task TransformAsync(string[] files, CancellationToken cancellationToken)
22+
{
2723
SharedSemaphoreSlim.Wait();
2824
try
2925
{
30-
var tasks = _transformTaskFactory.Create(files, CancellationToken);
26+
var tasks = _transformTaskFactory.Create(files, cancellationToken);
3127
await Task.WhenAll(tasks.Select(t => t.InvokeAsync()));
3228
}
3329
catch (Exception ex)

0 commit comments

Comments
 (0)