Skip to content

多线程编程

基本概念

  • 线程:线程是程序执行的最小单位,多个线程可以在同一进程中并发执行。
  • 进程:进程是资源分配的最小单位,进程之间是独立的,互相不能直接访问对方的内存。
  • 并发 vs 并行:并发是指多个任务在同一时间段内交替执行,并行是指多个任务在同一时刻同时执行(需要多核处理器支持)。

线程的创建

import threading
import time

# 方法1,继承Thread类
class MyThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        print(f"Starting {self.name}")
        time.sleep(2)
        print(f"Exiting {self.name}")

# 创建线程
thread1 = MyThread("Thread-1")
thread2 = MyThread("Thread-2")

# 启动线程
thread1.start()
thread2.start()

# 等待线程完成
thread1.join()
thread2.join()

print("Exiting Main Thread")

#方法2,创建Thread对象
def print_time(threadName, delay):
    print(f"Starting {threadName}")
    time.sleep(delay)
    print(f"Exiting {threadName}")

# 创建线程
thread1 = threading.Thread(target=print_time, args=("Thread-1", 2))
thread2 = threading.Thread(target=print_time, args=("Thread-2", 4))

# 启动线程
thread1.start()
thread2.start()

# 等待线程完成
thread1.join()
thread2.join()

print("Exiting Main Thread")

线程的同步和通信

可用方式: - 锁(Lock):确保只有一个线程可以访问共享资源。 - 信号量(Semaphore):控制对多个资源的访问。 - 条件变量(Condition Variable):线程可以等待某个条件发生。 - 事件(Event):允许线程等待某个事件的发生。 - 消息队列(Queue):线程安全的队列,用于线程间通信。

使用锁的计数器实现:

"""
Counter 类:定义一个计数器类,包含一个初始值为 0 的 value 变量和一个 lock 对象。increment 方法使用 lock 来确保只有一个线程可以在同一时间修改 value 的值。

worker 函数:每个线程执行的函数。它接受一个 Counter 对象和一个指定的增加次数。在每次调用 increment 方法时,它会增加计数器的值。

主程序:设置线程数量和每个线程增加计数器的次数。创建一个 Counter 对象和一个线程列表。然后,启动所有线程并等待它们完成。

线程同步:使用 Lock 对象来确保线程安全。with self.lock 语句确保在进入和离开 increment 方法时自动获取和释放锁。
"""
import threading

class Counter:
    def __init__(self):
        self.lock = threading.Lock()
        self.value = 0

    def increment(self):
        with self.lock:
            self.value += 1

counter = Counter()

def worker():
    for _ in range(100000):
        counter.increment()

threads = []
for i in range(10):
    thread = threading.Thread(target=worker)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()#使得所有线程都完成

print("Counter value:", counter.value)#结果是1000000

使用队列进行通信:

"""
生产者函数接受一个队列 q 作为参数。
循环生成 5 个整数(从 0 到 4),并将它们放入队列中。
每次放入数据后,打印 "Produced {i}",其中 i 是放入队列的数据。

消费者函数也接受一个队列 q 作为参数。
当队列不为空时,循环从队列中取出数据。
每次取出数据后,打印 "Consumed {item}",其中 item 是从队列中取出的数据。
"""
import threading
import queue

def producer(q):
    for i in range(5):
        q.put(i)
        print(f"Produced {i}")

def consumer(q):
    while not q.empty():
        item = q.get()
        print(f"Consumed {item}")

q = queue.Queue()

producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))

producer_thread.start()
producer_thread.join()  # 等待生产者线程完成

consumer_thread.start()
consumer_thread.join()  # 等待消费者线程完成

print("All tasks completed")

简单案例

哲学家进餐

哲学家就餐问题(Dining Philosophers Problem)是一个经典的同步问题,用来模拟并发程序中的资源竞争。问题描述如下:

有五个哲学家坐在圆桌旁,每个哲学家之间放置一根筷子。 哲学家必须同时拿起左右两边的筷子才能进餐,吃完后放下筷子继续思考。

import threading
import time

class Philosopher(threading.Thread):
    def __init__(self, name, left_fork, right_fork):
        threading.Thread.__init__(self)
        self.name = name
        self.left_fork = left_fork
        self.right_fork = right_fork

    def run(self):
        for i in range(3):
            print(f"{self.name} is thinking.")
            time.sleep(1)
            self.dine()

    def dine(self):
        fork1, fork2 = self.left_fork, self.right_fork
        while True:
            fork1.acquire()
            locked = fork2.acquire(False)
            if locked: break
            fork1.release()
            print(f"{self.name} swaps forks.")
            fork1, fork2 = fork2, fork1
        self.eat()
        fork2.release()
        fork1.release()

    def eat(self):
        print(f"{self.name} starts eating.")
        time.sleep(2)
        print(f"{self.name} finishes eating and leaves to think.")

def main():
    forks = [threading.Lock() for n in range(5)]
    philosopher_names = ['A', 'B', 'C', 'D', 'E']

    philosophers = [Philosopher(philosopher_names[i], forks[i % 5], forks[(i + 1) % 5])
                    for i in range(5)]

    for p in philosophers:
        p.start()

    for p in philosophers:
        p.join()

if __name__ == "__main__":
    main()

更进一步的实现

线程池

线程池(Thread Pool)是一种并发编程的技术,用于管理和复用线程,以提高多线程任务执行的效率和性能。线程池包含一组预先创建的线程,这些线程可以在需要时被动态地分配和重用,而不是每次任务到来时都创建新的线程。

from concurrent.futures import ThreadPoolExecutor
import time

def worker(name):
    print(f"Starting {name}")
    time.sleep(2)
    print(f"Exiting {name}")

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(worker, f"Thread-{i}") for i in range(5)]

    for future in futures:
        future.result()

print("All threads completed")

线程池参数的设置

最小连接数:是连接池一直保持的数据库连接,所以如果应用程序对数据库连接的使用量不大,将会有大量的数据库连接资源被浪费. 最大连接数:是连接池能申请的最大连接数,如果数据库连接请求超过次数,后面的数据库连接请求将被加入到等待队列中,这会影响以后的数据库操作 最大空闲时间 获取连接超时时间 超时重试连接次数

死锁检测和避免

死锁(Deadlock)是多线程或多进程并发编程中常见的问题,指的是两个或多个线程或进程在执行过程中因互相持有对方所需资源而无法继续执行的状态。

死锁产生的四个必要条件(四个条件全部满足时,死锁才会发生)是:

  1. 互斥条件(Mutual Exclusion): 指资源一次只能被一个线程或进程使用,即资源不能被共享。如果一个线程或进程已经获得了某个资源(如锁),其他线程或进程必须等待该资源释放。

  2. 请求与保持条件(Hold and Wait): 指一个线程或进程在持有至少一个资源的同时,又提出了对其他资源的请求,并且在等待期间不释放已经持有的资源。这样会导致其他线程或进程在等待被请求的资源时,无法继续执行。

  3. 不剥夺条件(No Preemption): 指系统不能抢占一个正在被其他线程或进程持有的资源,只能由持有资源的线程或进程自行释放。

  4. 循环等待条件(Circular Wait): 指一组线程或进程之间形成一种循环等待资源的关系,其中每个线程或进程都在等待下一个线程或进程所持有的资源。

这四个条件同时满足时,就可能导致死锁的发生。为了避免死锁,需要破坏这四个条件中的至少一个,具体可以:

  1. 按序获取锁
  2. 引入锁的层级顺序,要求所有线程按照相同的顺序获取锁。这样可以避免多个线程因锁的竞争而进入互相等待状态。

  3. 避免嵌套锁

  4. 尽量避免在持有一个锁的情况下去申请另一个锁。如果不得不这样做,确保获取锁的顺序是固定的,且所有线程都按照相同的顺序获取锁。

  5. 使用超时机制

  6. 在获取锁的过程中设置超时机制,如果超过一定时间未能获取到锁,就放弃或重新尝试获取。这可以避免线程因为无法获取锁而一直阻塞。

  7. 限制锁的持有时间

  8. 在使用锁的时候,尽量减少持有锁的时间。如果某些操作不需要对共享资源进行长时间的独占,可以考虑在临界区内尽快完成操作后释放锁。

  9. 死锁检测与恢复

  10. 实现死锁检测机制,及时发现死锁并进行恢复。例如,定期检查系统中的资源分配情况,发现潜在死锁后进行资源回收或者中断某些线程的执行。即允许系统剥夺某些线程的资源。
  11. 避免循环等待
  12. 确保系统中的资源分配是静态的,不会出现循环等待资源的情况。如果有循环等待的可能,可以通过资源预分配或者动态分配资源来避免。