模仿题 1:PyMySQL 查询 AI 工具
连接 ai_work_demo 数据库,查询 ai_tools 表中所有工具的 name 和 status。
查看参考答案 ▼
import pymysql
conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
database="ai_work_demo",
charset="utf8mb4"
)
cursor = conn.cursor()
try:
sql = "SELECT name, status FROM ai_tools;"
cursor.execute(sql)
rows = cursor.fetchall()
for name, status in rows:
status_text = "已启用" if status == 1 else "未启用"
print(f"工具:{name},状态:{status_text}")
finally:
cursor.close()
conn.close()
✓ 正确
参考答案:import pymysql
conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
database="ai_work_demo",
charset="utf8mb4"
)
cursor = conn.cursor()
try:
sql = "SELECT name, status FROM ai_tools;"
cursor.execute(sql)
rows = cursor.fetchall()
for name, status in rows:
status_text = "已启用" if status == 1 else "未启用"
print(f"工具:{name},状态:{status_text}")
finally:
cursor.close()
conn.close()
模仿题 2:redis-py 连接测试
使用 redis-py 连接本机 Redis,执行 ping(),并设置 day08:name 为张三。
查看参考答案 ▼
import redis
r = redis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True
)
try:
print(r.ping())
r.set("day08:name", "张三")
print(r.get("day08:name"))
finally:
r.close()
✓ 正确
参考答案:import redis
r = redis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True
)
try:
print(r.ping())
r.set("day08:name", "张三")
print(r.get("day08:name"))
finally:
r.close()
模仿题 3:Redis String 计数器
使用 Redis String 记录文章 1001 的阅读数,初始化为 0,然后自增 3 次。
查看参考答案 ▼
import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
try:
view_key = "day08:article:1001:views"
r.set(view_key, "0")
r.incr(view_key)
r.incr(view_key)
r.incr(view_key)
print("当前阅读数:", r.get(view_key))
finally:
r.close()
✓ 正确
参考答案:import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
try:
view_key = "day08:article:1001:views"
r.set(view_key, "0")
r.incr(view_key)
r.incr(view_key)
r.incr(view_key)
print("当前阅读数:", r.get(view_key))
finally:
r.close()
变体题 1:PyMySQL 启用 LangChain
把 MySQL 中 LangChain 的状态从未启用改成已启用,要求使用 commit() 和 rollback()。
查看参考答案 ▼
import pymysql
conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
database="ai_work_demo",
charset="utf8mb4"
)
cursor = conn.cursor()
try:
sql = "UPDATE ai_tools SET status = %s WHERE name = %s AND status = %s;"
params = (1, "LangChain", 0)
row_count = cursor.execute(sql, params)
if row_count != 1:
raise Exception(f"期望修改 1 行,实际修改了 {row_count} 行")
conn.commit()
print("LangChain 已启用")
except Exception as error:
conn.rollback()
print("启用失败,已回滚:", error)
finally:
cursor.close()
conn.close()
✓ 正确
参考答案:import pymysql
conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
database="ai_work_demo",
charset="utf8mb4"
)
cursor = conn.cursor()
try:
sql = "UPDATE ai_tools SET status = %s WHERE name = %s AND status = %s;"
params = (1, "LangChain", 0)
row_count = cursor.execute(sql, params)
if row_count != 1:
raise Exception(f"期望修改 1 行,实际修改了 {row_count} 行")
conn.commit()
print("LangChain 已启用")
except Exception as error:
conn.rollback()
print("启用失败,已回滚:", error)
finally:
cursor.close()
conn.close()
变体题 2:Redis Hash 存储用户信息
使用 Redis Hash 保存用户 user:2001 的 name、age、city,并打印完整用户信息。
查看参考答案 ▼
import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
try:
user_key = "day08:user:2001"
r.hset(user_key, mapping={
"name": "王五",
"age": "28",
"city": "广州"
})
print(r.hgetall(user_key))
finally:
r.close()
✓ 正确
参考答案:import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
try:
user_key = "day08:user:2001"
r.hset(user_key, mapping={
"name": "王五",
"age": "28",
"city": "广州"
})
print(r.hgetall(user_key))
finally:
r.close()
变体题 3:Redis Set 做标签去重
向 Redis Set 中添加 python、redis、python、ai 四个标签,观察去重效果。
查看参考答案 ▼
import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
try:
tag_key = "day08:tags"
r.delete(tag_key)
r.sadd(tag_key, "python", "redis", "python", "ai")
print(r.smembers(tag_key))
finally:
r.close()
✓ 正确
参考答案:import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
try:
tag_key = "day08:tags"
r.delete(tag_key)
r.sadd(tag_key, "python", "redis", "python", "ai")
print(r.smembers(tag_key))
finally:
r.close()
综合案例 1:MySQL 查询 + Redis 缓存
先用 PyMySQL 查询 ai_tools 表中已启用的工具;再把查询结果写入 Redis,key 为 day08:enabled_tools,设置 60 秒过期时间。
查看参考答案 ▼
import json
import pymysql
import redis
mysql_conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
database="ai_work_demo",
charset="utf8mb4"
)
mysql_cursor = mysql_conn.cursor()
redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
try:
sql = "SELECT name, scene FROM ai_tools WHERE status = %s;"
mysql_cursor.execute(sql, (1,))
rows = mysql_cursor.fetchall()
enabled_tools = []
for name, scene in rows:
enabled_tools.append({"name": name, "scene": scene})
cache_key = "day08:enabled_tools"
redis_client.setex(cache_key, 60, json.dumps(enabled_tools, ensure_ascii=False))
print("已启用工具已写入 Redis 缓存:", enabled_tools)
finally:
mysql_cursor.close()
mysql_conn.close()
redis_client.close()
✓ 正确
参考答案:import json
import pymysql
import redis
mysql_conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
database="ai_work_demo",
charset="utf8mb4"
)
mysql_cursor = mysql_conn.cursor()
redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
try:
sql = "SELECT name, scene FROM ai_tools WHERE status = %s;"
mysql_cursor.execute(sql, (1,))
rows = mysql_cursor.fetchall()
enabled_tools = []
for name, scene in rows:
enabled_tools.append({"name": name, "scene": scene})
cache_key = "day08:enabled_tools"
redis_client.setex(cache_key, 60, json.dumps(enabled_tools, ensure_ascii=False))
print("已启用工具已写入 Redis 缓存:", enabled_tools)
finally:
mysql_cursor.close()
mysql_conn.close()
redis_client.close()
综合案例 2:Redis 排行榜
使用 Redis Sorted Set 保存 AI 工具热度榜:ChatGPT 1000 分,DeepSeek 900 分,Dify 800 分;查询并打印热度从高到低的排行榜。
查看参考答案 ▼
import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
try:
rank_key = "day08:ai_tool_hot_rank"
r.delete(rank_key)
r.zadd(rank_key, {
"ChatGPT": 1000,
"DeepSeek": 900,
"Dify": 800
})
ranks = r.zrevrange(rank_key, 0, -1, withscores=True)
for index, (tool_name, score) in enumerate(ranks, start=1):
print(f"第 {index} 名:{tool_name},热度:{score}")
finally:
r.close()
✓ 正确
参考答案:import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
try:
rank_key = "day08:ai_tool_hot_rank"
r.delete(rank_key)
r.zadd(rank_key, {
"ChatGPT": 1000,
"DeepSeek": 900,
"Dify": 800
})
ranks = r.zrevrange(rank_key, 0, -1, withscores=True)
for index, (tool_name, score) in enumerate(ranks, start=1):
print(f"第 {index} 名:{tool_name},热度:{score}")
finally:
r.close()