在大規模的數據遷移過程中,性能問題往往是開發者面臨的主要挑戰之一。本文將分析一個數據遷移腳本的優化過程,展示如何從 MySQL 數據庫遷移數據到 Django 模型表,並探討優化前後的性能差異。
![](http://image.uc.cn/s/wemedia/s/upload/2024/36417d51ee7021cf920b70ff4c40b444.jpg)
優化前的腳本按批次從 MySQL 數據庫中讀取數據,並將其插入到 Django 模型表中。每次讀取的數據量由 batch_size 確定。以下是優化前的關鍵部分:
fetch_sql = f""" SELECT search_rank, search_term, `period`, report_date FROM hot_search_terms_table WHERE period = '{period}' LIMIT %s OFFSET %s;"""每次查詢使用 LIMIT 和 OFFSET 子句,OFFSET 指定從哪一行開始讀取。然而,隨著數據量的增加,OFFSET 會導致性能顯著下降,因爲數據庫必須掃描更多行來確定結果集的起點。
優化後的腳本分析優化後的腳本通過使用遞增的主鍵 ID 進行分頁查詢,避免了 OFFSET 帶來的性能問題。以下是優化後的關鍵部分:
fetch_sql = f""" SELECT id, search_rank, search_term,`period`, report_date FROM hot_search_terms_table WHERE period = %s AND id > %s ORDER BY id ASC LIMIT %s;"""通過 WHERE id > %s 和 ORDER BY id ASC,我們可以確保每次查詢的結果集都是按主鍵 ID 排序的,性能大大提高,因爲數據庫可以直接從上一次查詢結束的地方開始讀取數據。
優化前後的性能比較優化前的性能問題
查詢性能下降:隨著 OFFSET 值的增加,查詢性能會顯著下降。數據庫需要掃描所有的行,直到達到指定的偏移量,然後返回後續的行。長時間等待:當數據量較大時,隨著偏移量的增加,每次查詢所需的時間會變得越來越長。優化後的性能改進
高效的分頁查詢:使用遞增的主鍵 ID 進行分頁查詢,避免了掃描大量無關行的數據。穩定的查詢時間:每次查詢都只需讀取新的數據,無需掃描之前已經處理過的數據行,查詢時間穩定且較快。實施細節優化前的實現
優化前的實現通過讀取偏移量文件來記錄上次處理的位置,每次查詢都從該位置開始,讀取一批數據並插入到 Django 模型表中:
class Command(BaseCommand): # 省略部分代碼... def handle(self, *args, **kwargs): try: # 連接數據庫 mysql_conn = mysql.connector.connect(**mysql_config) mysql_cursor = mysql_conn.cursor() # 批次處理 while True: self.stdout.write(self.style.SUCCESS(f"正在獲取 {offset} - {offset + batch_size} 行的數據")) zhilin_cursor.execute(fetch_sql, (batch_size, offset)) batch_data = zhilin_cursor.fetchall() if not batch_data: break # 轉換並插入數據 objects = [HotSearchTermsReportABA(...) for row in batch_data] with transaction.atomic(): HotSearchTermsReportABA.objects.bulk_create(objects) offset += batch_size total_rows_transferred += len(batch_data) self.update_last_offset(offset) except Error as e: # 錯誤處理 self.stdout.write(self.style.ERROR(f"傳輸過程中出現異常:{e}"))優化後的實現
優化後的實現使用主鍵 ID 進行分頁查詢,並記錄上次處理的最大 ID:
class Command(BaseCommand): # 省略部分代碼... def handle(self, *args, **kwargs): try: # 連接數據庫 mysql_conn = mysql.connector.connect(**mysql_config) mysql_cursor = mysql_conn.cursor() # 批次處理 while True: self.stdout.write(self.style.SUCCESS(f"正在獲取 ID 大于 {last_id} 的 {self.batch_size} 行數據")) zhilin_cursor.execute(fetch_sql, (period, last_id, self.batch_size)) batch_data = zhilin_cursor.fetchall() if not batch_data: break # 轉換並插入數據 objects = [HotSearchTermsReportABA(...) for row in batch_data] with transaction.atomic(): HotSearchTermsReportABA.objects.bulk_create(objects) last_id = batch_data[-1][0] total_rows_transferred += len(batch_data) self.update_last_id(last_id) except Error as e: # 錯誤處理 self.stdout.write(self.style.ERROR(f"傳輸過程中出現異常:{e}"))總結通過上述優化過程解決了數據量增大導致的查詢性能下降問題。具體優化策略包括:
使用主鍵 ID 進行分頁查詢,避免 OFFSET 帶來的性能問題。確保每次查詢只讀取新的數據,減少數據庫掃描的行數。作者:pycode鏈接:https://juejin.cn/post/7384697662614519823