一、单选题(10 题)
在 Python 中提供多线程支持的模块是( )。
A multiprocessing
B threading
C asyncio
D subprocess
✓ 正确
解析:threading 是 Python 标准库中提供多线程支持的模块。
下面哪一项是正确的创建线程对象并启动的写法( )。
A t = threading.Thread(target=func); t.start()
B t = threading.Thread(target=func()); t.start()
C t = threading.start(target=func)
D t = threading.Thread(); t.run()
✓ 正确
解析:Thread 的 target 参数传函数引用(不加括号),创建后调用 start() 启动。
如果线程任务函数定义是 def work(a, b),以下传参方式正确的是( )。
A args=[1,2]
B args=(1,2)
C kwargs=(a:1,b:2)
D target=work(1,2)
✓ 正确
解析:args 参数接收元组。
(1,2) 是元组,多个元素按位置对应函数形参。
关于 Python 线程间共享全局变量,说法正确的是( )。
A 线程之间不能共享全局变量
B 线程之间共享全局变量且任何操作都不需额外同步
C 线程之间共享全局变量但多线程修改同一变量时可能引发数据错误
D 必须使用 global 关键字声明后线程才能共享全局变量
✓ 正确
解析:线程共享全局变量,但多线程同时修改同一变量时可能出现数据竞争,需加锁同步。
互斥锁对象的类型是( )。
A threading.Mutex
B threading.Locker
C threading.Lock
D threading.Semaphore
✓ 正确
解析:threading.Lock 是互斥锁类,调用
threading.Lock() 创建锁对象。
死锁发生时的典型现象是( )。
A 线程正常运行结束
B 多个线程互相等待对方释放锁,程序无法继续
C 程序中所有锁同时被释放
D 线程自动放弃锁并退出
✓ 正确
解析:死锁是指多个线程互相等待对方持有的资源,导致所有线程都无法继续执行。
关于 CPython 的 GIL(全局解释器锁),说法错误的是( )。
A GIL 使得同一时刻通常只有一个线程执行 Python 字节码
B GIL 可以自动保护所有共享变量,多线程编程不需要额外加锁
C GIL 是 CPython 解释器层面的设计,与用户主动创建的 Lock 不同
D 受 GIL 影响多线程不能有效利用多核做计算密集型任务
✓ 正确
解析:GIL 只保证字节码级别的安全,但 non-atomic 操作仍可能出问题,仍需要用户加 Lock 保护共享数据。
生成器推导式使用的括号是( )。
A 中括号 []
B 大括号 {}
C 小括号 ()
D 尖括号 <>
✓ 正确
解析:生成器推导式使用
(),列表推导式用
[],集合/字典推导式用
{}。
在 def 函数内使用 yield 关键字,该函数调用时返回的是( )。
A 最后一个 yield 表达式的值
B 一个生成器对象
C 一个迭代完成的列表
D 一个线程对象
✓ 正确
解析:含 yield 的函数是生成器函数,调用时返回生成器对象,不立即执行函数体。
在 Python 3.5+ 中,定义协程函数使用的关键字是( )。
A def
B async def
C coroutine def
D await def
✓ 正确
解析:Python 3.5+ 使用
async def 定义协程函数。
三、简答题(5 题)
简述使用 threading 模块创建并启动多个线程的三个核心步骤,并说明要注意什么。
查看参考答案 ▼
三大步骤:① import threading;② 创建线程对象 t = threading.Thread(target=函数名, args=(...), kwargs={...});③ t.start()。注意:target 传函数名不要加括号;多线程建议写在 if __name__ == '__main__': 下。
✓ 正确
参考答案:三大步骤:①
import threading;② 创建线程对象
t = threading.Thread(target=函数名, args=(...), kwargs={...});③
t.start()。注意:
target 传函数名
不要加括号;多线程建议写在
if __name__ == '__main__': 下。
为什么线程之间共享全局变量可能导致数据错误?互斥锁如何解决该问题?
查看参考答案 ▼
多线程同时读写全局变量时,可能出现"读—改—写"交错执行,导致实际更新丢失。互斥锁保证同一时刻只有一个线程能进入上锁区域,避免交错修改。
✓ 正确
参考答案:多线程同时读写全局变量时,可能出现"读—改—写"交错执行,导致实际更新丢失。互斥锁保证
同一时刻只有一个线程能进入上锁区域,避免交错修改。
解释什么是生成器?生成器有哪两种创建方式?yield 关键字在生成器函数中有什么作用?
查看参考答案 ▼
生成器是按规则逐个生成数据的机制,不一次性全部生成,可节省内存。两种创建方式:生成器推导式 (表达式 for 变量 in 可迭代对象) 和含 yield 的函数。yield 的作用:① 返回后面的值;② 暂停函数执行,下次从暂停处继续。
✓ 正确
参考答案:生成器是按规则
逐个生成数据的机制,不一次性全部生成,可
节省内存。两种创建方式:生成器推导式
(表达式 for 变量 in 可迭代对象) 和含
yield 的函数。
yield 的作用:① 返回后面的值;②
暂停函数执行,下次从暂停处继续。
什么是协程?它和生成器存在怎样的发展关系?协程的三要素分别是什么?
查看参考答案 ▼
协程是协作式并发的执行单元。Python 协程由生成器发展而来:最初用 yield 暂停和恢复,后引入 yield from,最终发展为 async/await 语法。协程三要素:① async def;② await;③ asyncio.run()。
✓ 正确
参考答案:协程是
协作式并发的执行单元。Python 协程由生成器发展而来:最初用
yield 暂停和恢复,后引入
yield from,最终发展为
async/await 语法。协程三要素:①
async def;②
await;③
asyncio.run()。
从 Python 的角度,比较进程、线程和协程在资源开销、数据共享、适用场景上的主要区别。
查看参考答案 ▼
进程:资源开销大,进程间不共享全局变量,可利用多核并行,稳定性好。线程:资源开销较小,线程间共享全局变量(需注意同步),受 GIL 限制不适合 CPU 密集型计算。协程:运行在单线程内,通过事件循环调度,资源开销极小,适合大量 I/O 密集任务的并发。
✓ 正确
参考答案:进程:资源开销大,进程间不共享全局变量,可利用多核并行,稳定性好。
线程:资源开销较小,线程间共享全局变量(需注意同步),受 GIL 限制不适合 CPU 密集型计算。
协程:运行在单线程内,通过事件循环调度,资源开销极小,适合大量 I/O 密集任务的并发。
四、代码实战(7 题)
多线程基础练习
导入 threading 模块。
定义两个函数 print_numbers() 和 print_letters(),分别输出 1~5 和 A~E。
分别创建两个线程执行上述两个函数,并启动线程。
多次运行,观察输出顺序是否每次相同。
查看参考答案 ▼
import threading
import time
def print_numbers():
for i in range(1, 6):
print(i)
time.sleep(0.1)
def print_letters():
for ch in ["A", "B", "C", "D", "E"]:
print(ch)
time.sleep(0.1)
t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)
t1.start()
t2.start()
t1.join()
t2.join()
# 观察结论:多次运行数字和字母的输出顺序不固定,体现线程执行的无序性。
✓ 正确
参考答案:import threading
import time
def print_numbers():
for i in range(1, 6):
print(i)
time.sleep(0.1)
def print_letters():
for ch in ["A", "B", "C", "D", "E"]:
print(ch)
time.sleep(0.1)
t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)
t1.start()
t2.start()
t1.join()
t2.join()
# 观察结论:多次运行数字和字母的输出顺序不固定,体现线程执行的无序性。
线程传参
定义一个函数 show_info(name, age),打印"姓名:xx,年龄:yy"。
创建两个子线程:一个用 args 元组方式传递参数;另一个用 kwargs 字典方式传递参数。
查看参考答案 ▼
import threading
def show_info(name, age):
print(f"姓名:{name},年龄:{age}")
# args 元组传参
t1 = threading.Thread(target=show_info, args=("张三", 20))
# kwargs 字典传参
t2 = threading.Thread(target=show_info, kwargs={"name": "李四", "age": 22})
t1.start()
t2.start()
t1.join()
t2.join()
✓ 正确
参考答案:import threading
def show_info(name, age):
print(f"姓名:{name},年龄:{age}")
# args 元组传参
t1 = threading.Thread(target=show_info, args=("张三", 20))
# kwargs 字典传参
t2 = threading.Thread(target=show_info, kwargs={"name": "李四", "age": 22})
t1.start()
t2.start()
t1.join()
t2.join()
互斥锁解决数据竞争
定义全局变量 counter = 0。
创建互斥锁 lock = threading.Lock()。
定义 increase() 循环 100000 次加 1,使用互斥锁保证安全。
创建两个子线程执行,等待结束后打印 counter。
查看参考答案 ▼
import threading
counter = 0
lock = threading.Lock()
def increase():
global counter
for _ in range(100000):
lock.acquire()
counter += 1
lock.release()
t1 = threading.Thread(target=increase)
t2 = threading.Thread(target=increase)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终计数值:{counter}") # 预期 200000
✓ 正确
参考答案:import threading
counter = 0
lock = threading.Lock()
def increase():
global counter
for _ in range(100000):
lock.acquire()
counter += 1
lock.release()
t1 = threading.Thread(target=increase)
t2 = threading.Thread(target=increase)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终计数值:{counter}") # 预期 200000
生成器函数
定义生成器函数 fibonacci(n),生成前 n 个斐波那契数。
用 for 循环遍历生成器并打印。
再用 next() 手动获取前三个值。
主动尝试取值超过上限时观察 StopIteration 异常。
查看参考答案 ▼
def fibonacci(n):
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
# for 循环遍历
for num in fibonacci(10):
print(num, end=" ")
print()
# 手动使用 next
gen = fibonacci(10)
print(next(gen)) # 0
print(next(gen)) # 1
print(next(gen)) # 1
# 连续调用 10 次后再 next 会抛出 StopIteration
✓ 正确
参考答案:def fibonacci(n):
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
# for 循环遍历
for num in fibonacci(10):
print(num, end=" ")
print()
# 手动使用 next
gen = fibonacci(10)
print(next(gen)) # 0
print(next(gen)) # 1
print(next(gen)) # 1
# 连续调用 10 次后再 next 会抛出 StopIteration
协程并发
使用 asyncio 模块。
定义协程函数 task(name, delay),输出"开始"→ sleep →"结束"。
定义 main(),用 create_task 创建两个任务,await 等待完成。
用 asyncio.run(main()) 启动。观察并发效果。
查看参考答案 ▼
import asyncio
import time
async def task(name, delay):
print(f"开始:{name}")
await asyncio.sleep(delay)
print(f"结束:{name}")
async def main():
t1 = asyncio.create_task(task("A", 1))
t2 = asyncio.create_task(task("B", 1))
await t1
await t2
start = time.time()
asyncio.run(main())
print(f"总耗时:{time.time() - start:.2f}秒")
# 观察:两个任务几乎同时完成,总耗时约1秒
✓ 正确
参考答案:import asyncio
import time
async def task(name, delay):
print(f"开始:{name}")
await asyncio.sleep(delay)
print(f"结束:{name}")
async def main():
t1 = asyncio.create_task(task("A", 1))
t2 = asyncio.create_task(task("B", 1))
await t1
await t2
start = time.time()
asyncio.run(main())
print(f"总耗时:{time.time() - start:.2f}秒")
# 观察:两个任务几乎同时完成,总耗时约1秒
线程安全卖票系统(挑战)
共有 10 张票,多个线程模拟售卖窗口。
每个线程循环卖票,每次 1~3 张,若剩余不够则卖完为止。
使用互斥锁防止超卖。
最终输出每个窗口卖出的票数。
查看参考答案 ▼
import threading
import random
import time
tickets = 10
lock = threading.Lock()
def sell(window_name):
global tickets
sold = 0
while True:
lock.acquire()
if tickets <= 0:
lock.release()
break
sell_num = min(random.randint(1, 3), tickets)
tickets -= sell_num
sold += sell_num
print(f"{window_name} 卖出了 {sell_num} 张票,剩余 {tickets} 张")
lock.release()
time.sleep(0.1)
print(f"{window_name} 总共卖出 {sold} 张票")
threads = []
for i in range(3):
t = threading.Thread(target=sell, args=(f"窗口{i+1}",))
threads.append(t)
t.start()
for t in threads:
t.join()
print("所有票已售罄或卖完")
✓ 正确
参考答案:import threading
import random
import time
tickets = 10
lock = threading.Lock()
def sell(window_name):
global tickets
sold = 0
while True:
lock.acquire()
if tickets <= 0:
lock.release()
break
sell_num = min(random.randint(1, 3), tickets)
tickets -= sell_num
sold += sell_num
print(f"{window_name} 卖出了 {sell_num} 张票,剩余 {tickets} 张")
lock.release()
time.sleep(0.1)
print(f"{window_name} 总共卖出 {sold} 张票")
threads = []
for i in range(3):
t = threading.Thread(target=sell, args=(f"窗口{i+1}",))
threads.append(t)
t.start()
for t in threads:
t.join()
print("所有票已售罄或卖完")
无限素数生成器(挑战)
编写生成器函数 prime_generator(),无限生成素数(从 2 开始)。
使用循环输出前 10 个素数。
再用 next() 获取第 11 个素数。
查看参考答案 ▼
def prime_generator():
num = 2
while True:
is_prime = True
for i in range(2, int(num**0.5) + 1):
if num % i == 0:
is_prime = False
break
if is_prime:
yield num
num += 1
# 输出前10个素数
gen = prime_generator()
print("前10个素数:", end="")
for _ in range(10):
print(next(gen), end=" ")
print()
# 获取第11个素数
print(f"第11个素数:{next(gen)}")
✓ 正确
参考答案:def prime_generator():
num = 2
while True:
is_prime = True
for i in range(2, int(num**0.5) + 1):
if num % i == 0:
is_prime = False
break
if is_prime:
yield num
num += 1
# 输出前10个素数
gen = prime_generator()
print("前10个素数:", end="")
for _ in range(10):
print(next(gen), end=" ")
print()
# 获取第11个素数
print(f"第11个素数:{next(gen)}")