記一次線上安全測試中誤用父類屬性導致數據汙染的解決方案

互聯架構唠唠嗑 2024-07-01 17:40:45

在線上安全測試的過程中,會使用 Nmap 進行端口掃描,爲了提升端口掃描的效率,掃描策略通常是檢測常用端口是否處于開放狀態,並在父類中使用名爲 all_open_ports 的屬性來記錄這些開放的端口。

在後續的測試過程中,需要檢查所涉及的端口是否包含在 all_open_ports 中。如果不存在,就需要進一步對這些端口進行開放檢測。如果端口的檢測結果是開放的,測試將繼續進行並將這些端口記錄到 all_open_ports 中,以便在下次遇到相同端口時無需重複檢測。

然而,由于安全測試是多線程進行的,某些情況下可以將 all_open_ports 理解爲共享變量,這導致當兩個不同的測試環境同時進行安全測試時,數據相互汙染,從而影響最終測試結果的准確性。

爲了解決這個問題,需要重新設計變量 all_open_ports 的存儲和訪問方式,以確保在多線程環境下數據的獨立性和一致性,接下來由博主爲各位讀者進行仔細講解。

博文中的所有代碼全部收集在博主的 GitHub 倉庫中:http://github.com/sid10t/project-actual-code/tree/main

場景複現

先創建一個父類 Parent,定義一個類屬性 all_open_ports 用來記錄已經開放的端口,並創建一個方法 check_port() 來模擬端口檢測,代碼如下所示:

class Parent: all_open_ports = set() def __init__(self, args): self.all_open_ports.update(args.get("open_ports", [])) def check_port(self, port): # 忽略端口掃描... if port not in self.all_open_ports: self.all_open_ports.add(port) print(f"{port} in all_open_ports, {self.all_open_ports}") pass

再創建一個子類 Child 繼承父類,構造 scan() 方法來模擬安全測試過程,代碼如下所示:

import threadingfrom parent import Parentclass Child(Parent): def __init__(self, args): super().__init__(args) self.port = args.get("port") def scan(self): print(threading.current_thread().name, self.all_open_ports) self.check_port(self.port) pass

最後創建一個測試用例,實例化兩個 Child 對象,並以多線程的方式運行對象方法 scan() 來進行場景複現,代碼如下所示:

def test_thread(): c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]}) t1 = threading.Thread(target=c1.scan, name="Child_1") t1.start() t1.join() c2 = Child({"port": 5001, "open_ports": [80, 3306, 5000]}) t2 = threading.Thread(target=c2.scan, name="Child_2") t2.start() t2.join() print("All tasks have finished!")

運行結果:

根因分析

造成上述問題的根本原因就是在多線程中 all_open_ports 可被當成共享變量使用,致使數據相互汙染,從而影響最終測試結果的准確性。

因爲 all_open_ports 是在父類中定義的一個類屬性,這意味著它是類 Parent 的一部分,它被所有派生類(子類)所共享。通過這種方式,父類的所有子類都可以訪問和更新 all_open_ports 屬性。

每當子類的實例創建時,如果傳遞了 open_ports 參數,那麽這些端口將被添加到 all_open_ports 集合中,並且在父類中的 check_port 方法中,判斷給定端口 port 是否存在于 all_open_ports 集合中,如果不存在,則將端口添加到集合中。這樣,所有子類實例都可以共享和更新這個屬性。

現在我們修改部分代碼,在打印時輸出 all_open_ports 的地址來判斷是否使用了同一變量,代碼如下所示:

def scan(self): print(threading.current_thread().name, self.all_open_ports, "id:", id(self.all_open_ports)) self.check_port(self.port) pass

運行結果:

那麽有什麽方法能解決當前的問題呢?

重新初始化 all_open_ports;上下文管理 contextvar;線程本地變量 thread.local;

重新初始化 all_open_ports

重新初始化 all_open_ports 的方法是最快捷的,但是會有一個問題,重新初始化 all_open_ports 會使得每個 Child 對象都有自己獨立的 all_open_ports 集合,而不會共享相同的集合,這會發生重複檢測端口的情況,也就違背了一開始的設計初衷。

創建一個測試用例來觀察一下當前的 all_open_ports 集合使用情況,代碼如下所示:

def test_init_set(): c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]}) c2 = Child({"port": 3002, "open_ports": [80, 443, 3306]}) print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports) c1.scan() print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports) c2.scan() print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)

運行結果:

根據運行結果可以發現, all_open_ports 集合在當前情況下可以被看做是共享變量,哪怕在不同的線程中,個 Child 對象都能共享 all_open_ports 集合。

這時候,修改父類 Parent 中的 __init__ 代碼,使得 all_open_ports 集合在 __init__ 時重新初始化,代碼如下所示:

def __init__(self, args): self.all_open_ports = set() self.all_open_ports.update(args.get("open_ports", []))

運行結果:

根據運行結果可以發現,c1 和 c2 中的 all_open_ports 是完全獨立的集合,c1 向 all_open_ports 集合中的增加操作不會影響到 c2,這雖然避免了數據汙染,但是會導致在 c1 檢測過的端口還需要在 c2 重新進行檢測,這與我們一開始設計 all_open_ports 集合來提升效率的想法背道而馳了。

上下文管理 contextvar

先說結論,好像不行,不知道是不是思路有問題,希望各位大神指點一下!

運行結果:

代碼如下所示:

class ParentContext: all_open_ports = contextvars.ContextVar("all_open_ports", default=set()) def __init__(self, args): open_ports = self.all_open_ports.get() open_ports.update(args.get("open_ports", [])) self.all_open_ports.set(open_ports) def check_port(self, port): all_open_ports_ = self.all_open_ports.get() if port not in all_open_ports_: all_open_ports_.add(port) self.all_open_ports.set(all_open_ports_) print(f"{print_prefix()} Port {port} is added to all_open_ports, {self.all_open_ports.get()}")class ChildContext(ParentContext): def __init__(self, args): super().__init__(args) self.port = args.get("port") def scan(self, port=None): self.check_port(port or self.port) passdef test_contextvars(open_ports, port): c1 = ChildContext({"port": port, "open_ports": open_ports}) c1.scan()if __name__ == '__main__': t1 = threading.Thread(target=test_contextvars, name="Child_1", args=([80, 3306, 5000], 5001,)) t2 = threading.Thread(target=test_contextvars, name="Child_2", args=([22, 3306, 6000], 6001,)) t1.start() t2.start() t1.join() t2.join()

線程本地變量 thread.local

threading.local() 是 Python 標准庫中的一個類,它提供了一種在多線程環境下創建線程本地存儲的機制。它允許每個線程都有自己獨立的變量副本,這些變量在不同線程之間是相互隔離的,不會相互幹擾。

當多個線程同時執行時,它們可以訪問和修改各自的線程本地變量,而不會影響其他線程的變量。這對于需要在線程之間共享數據,但又需要保持數據獨立性的情況非常有用。

接下來,我們創建父類 ParentLocal,並使用 threading.local() 來存儲集合 all_open_ports,代碼如下所示:

class ParentLocal: local = threading.local() def __init__(self, args): self.local.all_open_ports = getattr(self.local, "all_open_ports", set()) self.local.all_open_ports.update(args.get("open_ports", [])) def check_port(self, port): if port not in self.local.all_open_ports: self.local.all_open_ports.add(port) print(f"{self.print_prefix()} Port {port} is added to all_open_ports, {self.local.all_open_ports}") def print_prefix(self): return f"[{time.strftime('%H:%M:%S', time.localtime())} {threading.current_thread().name}]"

在上述代碼中,ParentLocal 類定義了初始化方法 __init__,通過 getattr() 函數來獲取 self.local.all_open_ports 的值。如果 self.local.all_open_ports 不存在,則使用 set() 創建一個空的集合,並將其賦值給 self.local.all_open_ports。然後,我們使用 update() 方法將 args.get("open_ports", []) 中的端口添加到 self.local.all_open_ports 中。

通過使用 ParentLocal 類,我們可以在多線程環境中創建多個實例,並且每個實例都有自己獨立的 all_open_ports 變量。這樣,不同線程的實例之間的數據不會相互幹擾。

而 Child 類與之前基本保持不變,代碼如下所示:

class ChildLocal(ParentLocal): def __init__(self, args): super().__init__(args) self.port = args.get("port") def scan(self, port=None): self.check_port(port or self.port) pass

在上述代碼中,ChildLocal 類是繼承自 ParentLocal 類的子類,通過繼承關系它可以訪問父類的 self.local.all_open_ports 集合。這使得 ChildLocal 實例可以在同一線程下共享數據,同時不會受到其他線程中的 ChildLocal 實例的影響。

編寫測試代碼如下所示:

def tset_local(open_ports, port): c1 = ChildLocal({"port": port, "open_ports": open_ports}) c1.scan() args = {"port": generate_random_numbers(1)[0], "open_ports": generate_random_numbers(3)} print(threading.current_thread().name, args) c2 = ChildLocal(args) c2.scan() time.sleep(3) c1.scan(random.randint(8000, 9999))if __name__ == '__main__': t1 = threading.Thread(target=tset_local, name="Child_1", args=([80, 3306, 5000], 5001,)) t2 = threading.Thread(target=tset_local, name="Child_2", args=([22, 3306, 6000], 6001,)) t1.start() t2.start() t1.join() t2.join()

運行結果:

如我們所料,Child1 和 Child2 線程中的 ChildLocal 實例相互之間共享 all_open_ports 集合的數據,但是不同線程之間的 ChildLocal 實例不能相互共享數據。

需要注意的是,threading.local() 對象在不同的線程中具有相同的 id 值,這是因爲它們實際上是同一個對象的不同實例。每個線程都有自己獨立的 threading.local() 對象,但它們共享相同的類定義。

當在不同的線程中創建 threading.local() 對象時,每個線程都會創建一個新的實例,但這些實例的類定義是相同的。因此,它們的 id 值是相同的。

後記

幸好我們及時發現了這個問題,並沒有造成安全事故。現在我們將這次經曆分享出來,希望能給其他開發團隊帶來啓發,共同提高系統的安全性和穩定性。

作者:sidiot鏈接:https://juejin.cn/post/7384326861964476425

0 阅读:0

互聯架構唠唠嗑

簡介:感謝大家的關注