Lesson Learned #234: Parallel vs Single Running a Bulk Insert with Python

This post has been republished via RSS; it originally appeared at: New blog articles in Microsoft Tech Community.

Today, I've been working on a service request that our customer wants to improve the performance of a bulk insert process. Following, I would like to share my experience working on that.

 

Our customer mentioned that inserting data (100.000 rows) is taking 14 seconds in a database in Business Critical. I was able to reproduce this time using a single thread using a table with 20 columns.

 

In order to improve this Python code, I suggested to run in parallel this bulk insert every batch size of 10.000 rows and also, I followed the best practices reducing the execution time of this process:

 

    • Client Virtual Machine level:
      • Accelerated networking enabled.
      • Depending how many parallel process that I needed create a CPU/Vcore, in this case, 10 vCores.
      • Placed the virtual machine in the same region that the DB is.
    • Database level:
      • Create a table with 20 columns.
      • As the PK is a sequential key I included in the clustered index definition the parameter  OPTIMIZE_FOR_SEQUENTIAL_KEY = ON
      • Configure the same number of CPU/vCores with the maximum number of parallel process that I would like to have. In this case, 10 vCores.
      • Depeding on amount of data use Business Critical to reduce the storage latency.
    • Python code level:
      • Using executemany method in order to reduce the network roundtrips, sending only the value of the parameters.
      • Running in batches (1000,10000) instead a single process.
      • Use SET NOCOUNT ON to reduce the replied response/rowset about how many rows were inserted.
      • In the connectionstring use autocommit=False

Example of python code that you could find here. This Python reads a CSV file and for every 10000 rows execute a bulk insert using thread pool. 

 

 

import csv import pyodbc import threading import os import datetime class ThreadsOrder: #Class to run in parallel the process. def ExecuteSQL(self,a,s,n): TExecutor = threading.Thread(target=ExecuteSQL,args=(a,s,n,)) TExecutor.start() def SaveResults( Message, bSaveFile): #Save the details of the file. try: print(Message) if (bSaveFile==True): file_object = open(filelog, "a") file_object.write(datetime.datetime.strftime(datetime.datetime.now(), '%d/%m/%y %H:%M:%S') + '-' + Message + '\n' ) file_object.close() except BaseException as e: print('And error occurred - ' , format(e)) def ExecuteSQLcc(sTableName): try: cnxn1 = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" +SQL_user+';PWD='+ SQL_password, autocommit=False, Timeout=3600) cursor = cnxn1.cursor() cursor.execute("DROP TABLE IF EXISTS" + sTableName ) cursor.commit() cursor.execute("CREATE TABLE " + sTableName + " (" \ " [Key] [int] NOT NULL," \ " [Num_TEST] [int] NULL," \ " [TEST_01] [varchar](6) NULL," \ " [TEST_02] [varchar](6) NULL," \ " [TEST_03] [varchar](6) NULL," \ " [TEST_04] [varchar](6) NULL," \ " [TEST_05] [varchar](6) NULL," \ " [TEST_06] [varchar](6) NULL," \ " [TEST_07] [varchar](6) NULL," \ " [TEST_08] [varchar](6) NULL," \ " [TEST_09] [varchar](6) NULL," \ " [TEST_10] [varchar](6) NULL," \ " [TEST_11] [varchar](6) NULL," \ " [TEST_12] [varchar](6) NULL," \ " [TEST_13] [varchar](6) NULL," \ " [TEST_14] [varchar](6) NULL," \ " [TEST_15] [varchar](6) NULL," \ " [TEST_16] [varchar](6) NULL," \ " [TEST_17] [varchar](6) NULL," \ " [TEST_18] [varchar](6) NULL," \ " [TEST_19] [varchar](6) NULL," \ " [TEST_20] [varchar](6) NULL)") cursor.commit() cursor.execute("CREATE CLUSTERED INDEX [ix_ms_example] ON " + sTableName + " ([Key] ASC) WITH (STATISTICS_NORECOMPUTE = OFF, DROP_EXISTING = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = ON) ON [PRIMARY]") cursor.commit() except BaseException as e: SaveResults('Executing SQL - an error occurred - ' + format(e),True) def ExecuteSQL(a,sTableName,n): try: Before = datetime.datetime.now() if n==-1: sTypeProcess = "NoAsync" else: sTypeProcess="Async - Thread:" + str(n) SaveResults('Executing at ' + str(Before) + " Process Type: " + sTypeProcess, True ) cnxn1 = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" +SQL_user+';PWD='+ SQL_password, autocommit=False, Timeout=3600) cursor = cnxn1.cursor() cursor.fast_executemany = True cursor.executemany("SET NOCOUNT ON;INSERT INTO " + sTableName +" ([Key], Num_TEST, TEST_01, TEST_02, TEST_03, TEST_04, TEST_05, TEST_06, TEST_07, TEST_08, TEST_09, TEST_10, TEST_11, TEST_12, TEST_13, TEST_14, TEST_15, TEST_16, TEST_17, TEST_18, TEST_19, TEST_20) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",a) cursor.commit() SaveResults('Time Difference INSERT process ' + str(datetime.datetime.now() - Before) + " " + sTypeProcess, True ) except BaseException as e: SaveResults('Executing SQL - an error occurred - ' + format(e),True) #Connectivity details. SQL_server = 'tcp:servername.database.windows.net,1433' SQL_database = 'databasename' SQL_user = 'username' SQL_password = 'password' #file details to read filepath = 'c:\\k\\' ##To Read the demo file filelog = filepath + '\\Error.log' #Save the log. chunksize = 10000 #Transaction batch rows. sTableName = "[test_data]" #Table Name (dummy) pThreadOrder = ThreadsOrder() nThread = 0 #Number of Threads -- Right now, we provided an unlimited threads. ExecuteSQLcc(sTableName) Before = datetime.datetime.now() line_count = 0 for directory, subdirectories, files in os.walk(filepath): for file in files: name, ext = os.path.splitext(file) if ext == '.csv': a=[] SaveResults('Reading the file ' + name ,True) BeforeFile= datetime.datetime.now() with open(os.path.join(directory,file), mode='r') as csv_file: csv_reader = csv.reader(csv_file, delimiter=',') for row in csv_reader: line_count+= 1 if line_count>1: a.append(row) if (line_count%chunksize)==0: deltaFile = datetime.datetime.now() - BeforeFile nThread=nThread+1 SaveResults('Time Difference Reading file is ' + str(deltaFile) + ' for ' + str(line_count) + ' rows', True ) pThreadOrder.ExecuteSQL(a,sTableName,nThread) #Open a new theard per transaction batch size. #ExecuteSQL(a,sTableName,-1) a=[] BeforeFile= datetime.datetime.now() SaveResults('Total Time Difference Reading file is ' + str(datetime.datetime.now() - Before) + ' for ' + str(line_count) + ' rows for the file: ' + name , True )

 

 

 

During the execution if you need to know the connections, number of rows and the impact in terms of resources see the following TSQL

 

 

 

SELECT substring(REPLACE(REPLACE(SUBSTRING(ST.text, (req.statement_start_offset/2) + 1, ( (CASE statement_end_offset WHEN -1 THEN DATALENGTH(ST.text) ELSE req.statement_end_offset END - req.statement_start_offset)/2) + 1) , CHAR(10), ' '), CHAR(13), ' '), 1, 512) AS statement_text ,req.database_id ,program_name ,req.session_id , req.cpu_time 'cpu_time_ms' , req.status , wait_time , wait_resource , wait_type , last_wait_type , req.total_elapsed_time , total_scheduled_time , req.row_count as [Row Count] , command , scheduler_id , memory_usage , req.writes , req.reads , req.logical_reads, blocking_session_id FROM sys.dm_exec_requests AS req inner join sys.dm_exec_sessions as sess on sess.session_id = req.session_id CROSS APPLY sys.dm_exec_sql_text(req.sql_handle) as ST where req.session_id <> @@SPID select count(*) from test_data select * from sys.dm_db_resource_stats order by end_time desc

 

 

 

Enjoy!

 

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.