In this post we shall start loading data in bulk.
For better performance of inserts, we shall load data into a table without constraints and indexes. This sounds familiar. There is a bulk copy utility, and it is very easy to invoke from C#. The following code feeds the output from a T-SQL stored procedure into a PostgreSql table:
using (var pgTableTarget = new PgTableTarget(PgConnString, "Data.MyPgTable", GetColumns()))using (var conn = new SqlConnection(connectionString)){
conn.Open();
using (var command = conn.CreateCommand())
{
command.CommandText = "EXEC MyStoredProc";
command.CommandType = CommandType.Text;
command.CommandTimeout = 0;
using (var dr = command.ExecuteReader())
{
var columnTypes = SetFields(dr.GetSchemaTable());
var adapter = new SqlDataReaderToPgTableAdapter(pgTableTarget);
while (dr.Read())
{
for (var columnIndex = 0; columnIndex < columnTypes.Count; columnIndex++)
{
adapter.AddValue(dataReader: dr, columnIndex: columnIndex, columnType: columnTypes[columnIndex]);
}
pgTableTarget.EndRow();
}
}
}
}
(snip)
private const int TYPE = 24;
internal static List<string> SetFields(DataTable schema){
return (from DataRow dataRow in schema.Rows select dataRow[TYPE].ToString().ToLower()).ToList();}
This code uses a couple of helper classes we developed ourselves. Here is a simple wrapper around COPY utility:
using System;using System.Collections.Generic;using System.Data;using Npgsql;
namespace Drw.Qr.FinDb.PostgresDataLoad
{
public class PgTableTarget : IDisposable
{
private readonly Npgsql.NpgsqlConnection _conn;
private readonly NpgsqlCommand _command;
private readonly NpgsqlCopySerializer _serializer;
private readonly NpgsqlCopyIn _copyIn;
public PgTableTarget(string connString, string tableName, IEnumerable<string> columns)
{
_conn = new NpgsqlConnection(connString);
_conn.Open();
_command = _conn.CreateCommand();
var copyStr = string.Format("COPY {0}({1}) FROM STDIN", tableName, string.Join(",", columns));
_command.CommandText = copyStr;
_command.CommandType = CommandType.Text;
_serializer = new NpgsqlCopySerializer(_conn);
_copyIn = new NpgsqlCopyIn(_command, _conn, _serializer.ToStream);
_copyIn.Start();
}
public void AddString(string value)
{
_serializer.AddString(value);
}
public void AddNull()
{
_serializer.AddNull();
}
public void AddInt32(int value)
{
_serializer.AddInt32(value);
}
public void AddNumber(double value)
{
_serializer.AddNumber(value);
}
public void EndRow()
{
_serializer.EndRow();
_serializer.Flush();
}
public void Dispose()
{
_copyIn.End();
_serializer.Flush();
_serializer.Close();
_command.Dispose();
_conn.Dispose();
}
}
}
The following code is a straightforward adapter:
using System;using System.Data.SqlClient;
namespace Drw.Qr.FinDb.PostgresDataLoad
{
public class SqlDataReaderToPgTableAdapter
{
private readonly PgTableTarget _pgTableTarget;
public SqlDataReaderToPgTableAdapter(PgTableTarget pgTableTarget)
{
_pgTableTarget = pgTableTarget;
}
public void AddValue(SqlDataReader dataReader,
int columnIndex, string columnType)
{
if (dataReader.IsDBNull(columnIndex))
{
_pgTableTarget.AddNull();
return;
}
switch (columnType)
{
case "varchar":
case "char":
_pgTableTarget.AddString(dataReader.GetString(columnIndex));
break;
case "decimal":
_pgTableTarget.AddNumber((double)dataReader.GetDecimal(columnIndex));
break;
case "int":
_pgTableTarget.AddInt32(dataReader.GetInt32(columnIndex));
break;
case "smallint":
_pgTableTarget.AddInt32(dataReader.GetInt16(columnIndex));
break;
case "real":
_pgTableTarget.AddNumber(dataReader.GetFloat(columnIndex));
break;
case "float":
_pgTableTarget.AddNumber(dataReader.GetDouble(columnIndex));
break;
default:
throw new ArgumentException("Not supported type: " + columnType);
}
}
}
}
Although both classes do not support all the available types, they do support all the types we need for this small project.
As I was typing this post, the code already moved over several million rows.
No comments:
Post a Comment