數據遷移腳本優化過程:從MySQL到Django模型表

互聯架構唠唠嗑 2024-06-27 17:57:11

在大規模的數據遷移過程中,性能問題往往是開發者面臨的主要挑戰之一。本文將分析一個數據遷移腳本的優化過程,展示如何從 MySQL 數據庫遷移數據到 Django 模型表,並探討優化前後的性能差異。

優化前的腳本分析

優化前的腳本按批次從 MySQL 數據庫中讀取數據,並將其插入到 Django 模型表中。每次讀取的數據量由 batch_size 確定。以下是優化前的關鍵部分:

fetch_sql = f""" SELECT search_rank, search_term, `period`, report_date FROM hot_search_terms_table WHERE period = '{period}' LIMIT %s OFFSET %s;"""

每次查詢使用 LIMIT 和 OFFSET 子句,OFFSET 指定從哪一行開始讀取。然而,隨著數據量的增加,OFFSET 會導致性能顯著下降,因爲數據庫必須掃描更多行來確定結果集的起點。

優化後的腳本分析

優化後的腳本通過使用遞增的主鍵 ID 進行分頁查詢,避免了 OFFSET 帶來的性能問題。以下是優化後的關鍵部分:

fetch_sql = f""" SELECT id, search_rank, search_term,`period`, report_date FROM hot_search_terms_table WHERE period = %s AND id > %s ORDER BY id ASC LIMIT %s;"""

通過 WHERE id > %s 和 ORDER BY id ASC,我們可以確保每次查詢的結果集都是按主鍵 ID 排序的,性能大大提高,因爲數據庫可以直接從上一次查詢結束的地方開始讀取數據。

優化前後的性能比較

優化前的性能問題

查詢性能下降:隨著 OFFSET 值的增加,查詢性能會顯著下降。數據庫需要掃描所有的行,直到達到指定的偏移量,然後返回後續的行。長時間等待:當數據量較大時,隨著偏移量的增加,每次查詢所需的時間會變得越來越長。

優化後的性能改進

高效的分頁查詢:使用遞增的主鍵 ID 進行分頁查詢,避免了掃描大量無關行的數據。穩定的查詢時間:每次查詢都只需讀取新的數據,無需掃描之前已經處理過的數據行,查詢時間穩定且較快。實施細節

優化前的實現

優化前的實現通過讀取偏移量文件來記錄上次處理的位置,每次查詢都從該位置開始,讀取一批數據並插入到 Django 模型表中:

class Command(BaseCommand): # 省略部分代碼... def handle(self, *args, **kwargs): try: # 連接數據庫 mysql_conn = mysql.connector.connect(**mysql_config) mysql_cursor = mysql_conn.cursor() # 批次處理 while True: self.stdout.write(self.style.SUCCESS(f"正在獲取 {offset} - {offset + batch_size} 行的數據")) zhilin_cursor.execute(fetch_sql, (batch_size, offset)) batch_data = zhilin_cursor.fetchall() if not batch_data: break # 轉換並插入數據 objects = [HotSearchTermsReportABA(...) for row in batch_data] with transaction.atomic(): HotSearchTermsReportABA.objects.bulk_create(objects) offset += batch_size total_rows_transferred += len(batch_data) self.update_last_offset(offset) except Error as e: # 錯誤處理 self.stdout.write(self.style.ERROR(f"傳輸過程中出現異常:{e}"))

優化後的實現

優化後的實現使用主鍵 ID 進行分頁查詢,並記錄上次處理的最大 ID:

class Command(BaseCommand): # 省略部分代碼... def handle(self, *args, **kwargs): try: # 連接數據庫 mysql_conn = mysql.connector.connect(**mysql_config) mysql_cursor = mysql_conn.cursor() # 批次處理 while True: self.stdout.write(self.style.SUCCESS(f"正在獲取 ID 大于 {last_id} 的 {self.batch_size} 行數據")) zhilin_cursor.execute(fetch_sql, (period, last_id, self.batch_size)) batch_data = zhilin_cursor.fetchall() if not batch_data: break # 轉換並插入數據 objects = [HotSearchTermsReportABA(...) for row in batch_data] with transaction.atomic(): HotSearchTermsReportABA.objects.bulk_create(objects) last_id = batch_data[-1][0] total_rows_transferred += len(batch_data) self.update_last_id(last_id) except Error as e: # 錯誤處理 self.stdout.write(self.style.ERROR(f"傳輸過程中出現異常:{e}"))總結

通過上述優化過程解決了數據量增大導致的查詢性能下降問題。具體優化策略包括:

使用主鍵 ID 進行分頁查詢,避免 OFFSET 帶來的性能問題。確保每次查詢只讀取新的數據,減少數據庫掃描的行數。

作者:pycode鏈接:https://juejin.cn/post/7384697662614519823

0 阅读:0

互聯架構唠唠嗑

簡介:感謝大家的關注