Lesson Learned #463: Synchronizing Large Data Volumes Between SQL Databases: A Python Solution

This post has been republished via RSS; it originally appeared at: Microsoft Tech Community - Latest Blogs - .

While working on a support case, we faced the need to replicate a substantial volume of data in a table to conduct performance comparisons between two different SQL databases. During this troubleshooting process, we crafted a small yet powerful Python script to accomplish this task, which might be beneficial in similar scenarios. Given that the source table contained several million records, the utilization of multithreading was crucial to handle this massive data efficiently. Moreover, every execution of the script clearly indicated the GUID of the most recently processed data, adding a layer of traceability to our operations. As always, this script serves as an example and should be thoroughly tested in a controlled environment, with users taking responsibility for its deployment.

 

In the realm of database management, particularly during support and performance tuning tasks, mirroring large datasets between different databases is a common yet complex challenge. Our objective was straightforward but technically demanding: replicate extensive data volumes from one SQL database table to another to facilitate accurate performance benchmarking.

 

Overview of the Python Script

Our script employs Python’s pyodbc module, which facilitates interaction with SQL Server databases, and utilizes multi-threading via the concurrent.futures module to enhance performance. The script features:

  1. Robust Database Connection: Using pyodbc, the script establishes connections with both source and target databases, implementing retry logic for reliability.

  2. Concurrent Data Processing: The ThreadPoolExecutor is used for parallel processing, speeding up the data synchronization.

  3. Intelligent Data Handling: The script employs SQL MERGE statements, ensuring that data is either updated or inserted as needed, enhancing efficiency.

  4. Transaction Management: With autocommit set to False, the script handles transactions manually, ensuring data integrity.

  5. Process Monitoring: Utilization of GUIDs and timestamps to track the synchronization process.

 

Script Walkthrough

  1. Connection Configuration:

    • Establish connections to the SQL Server databases (A and B) with retry capabilities for robustness.
  2. Total Rows Calculation:

    • Determine the number of rows to be synchronized from Database A to Database B using SQL Server's Dynamic Management Views (DMVs).
  3. Data Processing in Batches:

    • The script reads data in batches and synchronizes it, reducing memory usage and improving efficiency.
  4. Merge Operation:

    • Using a temporary table and MERGE SQL statements, the script ensures that each record is either updated or inserted in Database B, marked with a unique process GUID for traceability.
  5. Transaction Management:

    • The script uses explicit transaction control for each batch process, ensuring data integrity.
  6. Performance Tracking:

    • The script records the start and end times, providing insights into the process duration.

 

-- Structure for Server A

CREATE TABLE [dbo].[Table1](
	[ID] [bigint] IDENTITY(1,1) NOT NULL,
	[Item1] [nvarchar](200) NULL,
 CONSTRAINT [PK__Table1__3214EC2758D12B45] PRIMARY KEY CLUSTERED 
(
	[ID] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO

DECLARE @count INT = 0;

WHILE @count < 20000
BEGIN
    INSERT INTO [dbo].[Table1] ([Item1])
    VALUES (CONCAT('Item ', @count + 1));

    SET @count = @count + 1;
END;


 

-- Script for ServerB

CREATE TABLE [dbo].[Table1](
	[ID] [bigint] NOT NULL,
	[Item1] [nvarchar](200) NULL,
	[ProcessGUID] [uniqueidentifier] NULL,
 CONSTRAINT [PK__Table1__3214EC27FB61D54C] PRIMARY KEY CLUSTERED 
(
	[ID] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO

 

import pyodbc
import time
import sys
import uuid
from concurrent.futures import ThreadPoolExecutor
import datetime

# Configuración de conexión a las bases de datos
conn_str_A = 'DRIVER={ODBC Driver 18 for SQL Server};SERVER=servernamea.database.windows.net;DATABASE=A;UID=user1;PWD=pwd1!;APP=Test-Sync Reader'
conn_str_B = 'DRIVER={ODBC Driver 18 for SQL Server};SERVER=servernameb.database.windows.net;DATABASE=B;UID=user2;PWD=pwd2!;APP=Test-Sync Writer'

def create_connections(conn_str, count, max_retries=3, delay=5):
    """Create a list of database connections with retry policy."""
    connections = []
    for _ in range(count):
        retries = 0
        while retries < max_retries:
            try:
                conn = pyodbc.connect(conn_str, timeout=30,autocommit=False)
                connections.append(conn)
                break  # Exit the retry loop if connection is successful
            except pyodbc.Error as e:
                print(f"Failed to connect to database: {e}")
                retries += 1
                if retries >= max_retries:
                    print("Maximum retry attempts reached. Exiting the application.")
                    sys.exit(1)  # Exit the application if all retries fail
                print(f"Retrying in {delay} seconds...")
                time.sleep(delay)  # Wait for a while before retrying
    return connections

def get_total_rows(conn, table_name):
    """Retrieve an estimated number of rows in a table using DMV."""
    try:
        with conn.cursor() as cursor:
            # Query to get an estimated row count
            query = f"""
                SELECT SUM(row_count) 
                FROM sys.dm_db_partition_stats 
                WHERE object_id=OBJECT_ID('{table_name}') 
                AND (index_id=0 OR index_id=1);
            """
            cursor.execute(query)
            result = cursor.fetchone()
            if result and result[0]:
                return result[0]
            else:
                print("No rows found or unable to retrieve row count.")
                sys.exit(1)
    except pyodbc.Error as e:
        print(f"Error retrieving row count: {e}")
        sys.exit(1)

def process_block_A_to_B(conn_A, conn_B, offset, limit, process_guid, batch_size=10000):
    """Process a block of data from Database A to B in batches using SQL MERGE."""
    records = []
    with conn_A.cursor() as cursor_A:
        print(f"Thread: {offset} {limit}")
        cursor_A.execute(f"SELECT ID, Item1 FROM Table1 ORDER BY ID OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY")
        
        for id, item1 in cursor_A:
            records.append((id, item1, process_guid))
            if len(records) == batch_size:
                merge_into_database_B(conn_B, records)
                records = []

        if records:
            merge_into_database_B(conn_B, records)

def merge_into_database_B(conn, records):
    """Merge records into Database B within a single transaction using a process GUID."""
    with conn.cursor() as cursor:
        try:
            cursor.fast_executemany = True
            cursor.execute("BEGIN TRANSACTION")
            cursor.execute("CREATE TABLE #TempTable (ID bigint, Item1 nvarchar(200), ProcessGUID uniqueidentifier)")
            cursor.executemany("INSERT INTO #TempTable (ID, Item1, ProcessGUID) VALUES (?, ?, ?)", records)
            conn.commit()
        except Exception as e:
            print(f"Error during table creation and data insertion: {e}")
            conn.rollback()
            return

        try:
            cursor.execute("BEGIN TRANSACTION")
            merge_query = """
                MERGE INTO Table1 AS Target
                USING #TempTable AS Source
                ON Target.ID = Source.ID
                WHEN MATCHED THEN 
                    UPDATE SET Target.Item1 = Source.Item1, Target.ProcessGUID = Source.ProcessGUID
                WHEN NOT MATCHED THEN
                    INSERT (ID, Item1, ProcessGUID) VALUES (Source.ID, Source.Item1, Source.ProcessGUID);
            """
            cursor.execute(merge_query)
            conn.commit()
        except Exception as e:
            print(f"Error during MERGE operation: {e}")
            conn.rollback()
        finally:
            cursor.execute("DROP TABLE #TempTable")

def main():
    start_time = datetime.datetime.now()  
    print(f"Process started at: {start_time}")
    num_workers = 10
    process_guid = generate_guid()
    print(f"Process GUID: {process_guid}")    
    connections_A = create_connections(conn_str_A, num_workers)
    connections_B = create_connections(conn_str_B, num_workers)

    total_rows_A = get_total_rows(connections_A[0], "Table1")

    limit_A = (total_rows_A + num_workers - 1) // num_workers  

    with ThreadPoolExecutor(num_workers) as executor:
        for i in range(num_workers):
            offset_A = i * limit_A
            # Ajustar el límite para el último worker
            if i == num_workers - 1:
                actual_limit_A = total_rows_A - offset_A
            else:
                actual_limit_A = limit_A

            executor.submit(process_block_A_to_B, connections_A[i], connections_B[i], offset_A, actual_limit_A, process_guid)

    end_time = datetime.datetime.now()  
    print(f"Process completed at: {end_time}")
    print(f"Total duration: {end_time - start_time}")

def generate_guid():
    """Generate a new GUID."""
    return uuid.uuid4()

if __name__ == "__main__":
    main()

 

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.