Wednesday, March 12, 2014

Learning PostgreSql: bulk loading data

In this post we shall start loading data in bulk.
For better performance of inserts, we shall load data into a table without constraints and indexes. This sounds familiar. There is a bulk copy utility, and it is very easy to invoke from C#. The following code feeds the output from a T-SQL stored procedure into a PostgreSql table:
using (var pgTableTarget = new PgTableTarget(PgConnString"Data.MyPgTable"GetColumns()))using (var conn = new SqlConnection(connectionString)){
   conn.Open();
   
using (var command conn.CreateCommand())
   
{
       command.CommandText "EXEC MyStoredProc";
       
command.CommandType CommandType.Text;
       
command.CommandTimeout 0;
       
using (var dr command.ExecuteReader())
       
{
           var columnTypes SetFields(dr.GetSchemaTable());
           
var adapter = new SqlDataReaderToPgTableAdapter(pgTableTarget);
           
while (dr.Read())
           
{
               for (var columnIndex 0columnIndex columnTypes.CountcolumnIndex++)
               
{
                   adapter.AddValue(dataReader: drcolumnIndex: columnIndexcolumnType: columnTypes[columnIndex]);
               
}
               pgTableTarget.EndRow();
           
}
       }
   }
}

(snip)
private const int TYPE 24;
internal static List<stringSetFields(DataTable schema){
   return (from DataRow dataRow in schema.Rows select dataRow[TYPE].ToString().ToLower()).ToList();}
 
This code uses a couple of helper classes we developed ourselves. Here is a simple wrapper around COPY utility:

using System;using System.Collections.Generic;using System.Data;using Npgsql;
namespace Drw.Qr.FinDb.PostgresDataLoad
{
    public class PgTableTarget IDisposable
    {
        private readonly Npgsql.NpgsqlConnection _conn;
        
private readonly NpgsqlCommand _command;
        
private readonly NpgsqlCopySerializer _serializer;
        
private readonly NpgsqlCopyIn _copyIn;

        
public PgTableTarget(string connStringstring tableNameIEnumerable<stringcolumns)
        
{
            _conn = new NpgsqlConnection(connString);
            
_conn.Open();
            
_command _conn.CreateCommand();
            
var copyStr = string.Format("COPY {0}({1}) FROM STDIN"tableNamestring.Join(","columns));
            
_command.CommandText copyStr;
            
_command.CommandType CommandType.Text;
            
_serializer = new NpgsqlCopySerializer(_conn);
            
_copyIn = new NpgsqlCopyIn(_command_conn_serializer.ToStream);
            
_copyIn.Start();
        
}

        public void AddString(string value)
        
{
            _serializer.AddString(value);
        
}

        public void AddNull()
        
{
            _serializer.AddNull();
        
}

        public void AddInt32(int value)
        
{
            _serializer.AddInt32(value);
        
}

        public void AddNumber(double value)
        
{
            _serializer.AddNumber(value);
        
}

        public void EndRow()
        
{
            _serializer.EndRow();
            
_serializer.Flush();
        
}

        public void Dispose()
        
{
            _copyIn.End();
            
_serializer.Flush();
            
_serializer.Close();
            
_command.Dispose();
            
_conn.Dispose();
        
}
    }
}
  

The following code is a straightforward adapter:

using System;using System.Data.SqlClient;
namespace Drw.Qr.FinDb.PostgresDataLoad
{
    public class SqlDataReaderToPgTableAdapter
    {
        private readonly PgTableTarget _pgTableTarget;

        
public SqlDataReaderToPgTableAdapter(PgTableTarget pgTableTarget)
        
{
            _pgTableTarget pgTableTarget;           
        
}

        public void AddValue(SqlDataReader dataReader,
                             
int columnIndexstring columnType)
        
{
            if (dataReader.IsDBNull(columnIndex))
            
{
                _pgTableTarget.AddNull();
                
return;
            
}
            switch (columnType)
            
{
                case "varchar":
                
case "char":
                    
_pgTableTarget.AddString(dataReader.GetString(columnIndex));
                    
break;
                
case "decimal":
                    
_pgTableTarget.AddNumber((double)dataReader.GetDecimal(columnIndex));
                    
break;
                
case "int":
                    
_pgTableTarget.AddInt32(dataReader.GetInt32(columnIndex));
                    
break;
                
case "smallint":
                    
_pgTableTarget.AddInt32(dataReader.GetInt16(columnIndex));
                    
break;
                
case "real":
                    
_pgTableTarget.AddNumber(dataReader.GetFloat(columnIndex));
                    
break;
                
case "float":
                    
_pgTableTarget.AddNumber(dataReader.GetDouble(columnIndex));
                    
break;
                
default:
                    throw new ArgumentException("Not supported type: " columnType);
            
}           
        }
    }
}
  

Although both classes do not support all the available types, they do support all the types we need for this small project.
As I was typing this post, the code already moved over several million rows. 

No comments:

Post a Comment