In this post we shall start loading data in bulk.
For better performance of inserts, we shall load data into a table without constraints and indexes. This sounds familiar. There is a bulk copy utility, and it is very easy to invoke from C#. The following code feeds the output from a T-SQL stored procedure into a PostgreSql table:
using (var pgTableTarget = new PgTableTarget(PgConnString, "Data.MyPgTable", GetColumns()))using (var conn = new SqlConnection(connectionString)){
   conn.Open();
   using (var command = conn.CreateCommand())
   {
       command.CommandText = "EXEC MyStoredProc";
       command.CommandType = CommandType.Text;
       command.CommandTimeout = 0;
       using (var dr = command.ExecuteReader())
       {
           var columnTypes = SetFields(dr.GetSchemaTable());
           var adapter = new SqlDataReaderToPgTableAdapter(pgTableTarget);
           while (dr.Read())
           {
               for (var columnIndex = 0; columnIndex < columnTypes.Count; columnIndex++)
               {
                   adapter.AddValue(dataReader: dr, columnIndex: columnIndex, columnType: columnTypes[columnIndex]);
               }
               pgTableTarget.EndRow();
           }
       }
   }
}
(snip)
private const int TYPE = 24;
internal static List<string> SetFields(DataTable schema){
   return (from DataRow dataRow in schema.Rows select dataRow[TYPE].ToString().ToLower()).ToList();} 
This code uses a couple of helper classes we developed ourselves. Here is a simple wrapper around COPY utility:
using System;using System.Collections.Generic;using System.Data;using Npgsql;
namespace Drw.Qr.FinDb.PostgresDataLoad
{
    public class PgTableTarget : IDisposable
    {
        private readonly Npgsql.NpgsqlConnection _conn;
        private readonly NpgsqlCommand _command;
        private readonly NpgsqlCopySerializer _serializer;
        private readonly NpgsqlCopyIn _copyIn;
        public PgTableTarget(string connString, string tableName, IEnumerable<string> columns)
        {
            _conn = new NpgsqlConnection(connString);
            _conn.Open();
            _command = _conn.CreateCommand();
            var copyStr = string.Format("COPY {0}({1}) FROM STDIN", tableName, string.Join(",", columns));
            _command.CommandText = copyStr;
            _command.CommandType = CommandType.Text;
            _serializer = new NpgsqlCopySerializer(_conn);
            _copyIn = new NpgsqlCopyIn(_command, _conn, _serializer.ToStream);
            _copyIn.Start();
        }
        public void AddString(string value)
        {
            _serializer.AddString(value);
        }
        public void AddNull()
        {
            _serializer.AddNull();
        }
        public void AddInt32(int value)
        {
            _serializer.AddInt32(value);
        }
        public void AddNumber(double value)
        {
            _serializer.AddNumber(value);
        }
        public void EndRow()
        {
            _serializer.EndRow();
            _serializer.Flush();
        }
        public void Dispose()
        {
            _copyIn.End();
            _serializer.Flush();
            _serializer.Close();
            _command.Dispose();
            _conn.Dispose();
        }
    }
}  
The following code is a straightforward adapter:
using System;using System.Data.SqlClient;
namespace Drw.Qr.FinDb.PostgresDataLoad
{
    public class SqlDataReaderToPgTableAdapter
    {
        private readonly PgTableTarget _pgTableTarget;
        public SqlDataReaderToPgTableAdapter(PgTableTarget pgTableTarget)
        {
            _pgTableTarget = pgTableTarget;           
        }
        public void AddValue(SqlDataReader dataReader,
                             int columnIndex, string columnType)
        {
            if (dataReader.IsDBNull(columnIndex))
            {
                _pgTableTarget.AddNull();
                return;
            }
            switch (columnType)
            {
                case "varchar":
                case "char":
                    _pgTableTarget.AddString(dataReader.GetString(columnIndex));
                    break;
                case "decimal":
                    _pgTableTarget.AddNumber((double)dataReader.GetDecimal(columnIndex));
                    break;
                case "int":
                    _pgTableTarget.AddInt32(dataReader.GetInt32(columnIndex));
                    break;
                case "smallint":
                    _pgTableTarget.AddInt32(dataReader.GetInt16(columnIndex));
                    break;
                case "real":
                    _pgTableTarget.AddNumber(dataReader.GetFloat(columnIndex));
                    break;
                case "float":
                    _pgTableTarget.AddNumber(dataReader.GetDouble(columnIndex));
                    break;
                default:
                    throw new ArgumentException("Not supported type: " + columnType);
            }           
        }
    }
}
Although both classes do not support all the available types, they do support all the types we need for this small project.
As I was typing this post, the code already moved over several million rows. 
 
No comments:
Post a Comment