🐍 Python 第四课:王者之路——高阶特性与实战开发
课程目标:
- 掌握生成器与迭代器,写出省内存的高效代码
- 学会装饰器,给函数"开挂"
- 理解上下文管理器,让资源管理更优雅
- 初探并发编程,让程序快如闪电
- 数据库操作,让数据有个家
- Web 开发入门,做个能访问的网站
1) 回顾上节课
上节课我们学会了:
- 文件读写与 JSON 处理
- 模块、包与虚拟环境
- 面向对象编程:类、继承、封装、魔术方法
- 实战:学生管理系统
常见问题解答:
Q:self 到底是个啥?
A:self 就是对象自己。调用 dog.bark() 时,Python 自动把 dog 作为第一个参数传给 bark(self),所以 self.name 就是 dog.name。
Q:继承时 super() 是干嘛的?
A:调用父类的方法。比如子类构造方法里 super().__init__() 就是执行父类的构造,避免重复写代码。
2) 生成器与迭代器:省内存的神器
迭代器(Iterator)
迭代器是"懒加载"的代表,一次只给你一个元素,而不是一次性全塞给你。
# 列表是一次性生成所有元素,占内存
big_list = [i for i in range(10000000)] # 内存爆炸!
# 迭代器:用的时候再生成
big_iter = iter(range(10000000)) # 几乎不占内存
print(next(big_iter)) # 0
print(next(big_iter)) # 1
# ... 要一个给一个自定义迭代器:
class CountDown:
def __init__(self, start):
self.start = start
def __iter__(self):
return self
def __next__(self):
if self.start <= 0:
raise StopIteration # 迭代结束的信号
self.start -= 1
return self.start + 1
# 使用
for num in CountDown(5):
print(num) # 5, 4, 3, 2, 1生成器(Generator):更简单的迭代器
写 __iter__ 和 __next__ 太麻烦?用 yield!
def countdown(start):
while start > 0:
yield start # 暂停在这里,返回 start,下次从这里继续
start -= 1
# 使用
for num in countdown(5):
print(num) # 5, 4, 3, 2, 1
# 本质
gen = countdown(3)
print(next(gen)) # 5
print(next(gen)) # 4
# ...生成器表达式(类比列表推导式):
# 列表推导式:一次性生成所有,占内存
squares_list = [x**2 for x in range(1000000)]
# 生成器表达式:惰性求值,省内存
squares_gen = (x**2 for x in range(1000000))
# 使用时才计算
for sq in squares_gen:
if sq > 100:
break # 后面的根本不生成,省了大量计算实际应用:读取大文件
def read_large_file(file_path):
"""逐行读取大文件,不会内存溢出"""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
yield line.strip() # 一次只处理一行
# 处理 10GB 的日志文件也毫无压力
for line in read_large_file('huge_log.txt'):
process(line) # 处理每一行3) 装饰器:给函数"穿装备"
什么是装饰器?
装饰器就是一个接收函数作为参数,返回函数的高阶函数。用来在不修改原函数的情况下,增加额外功能。
import time
def timer(func):
"""计算函数运行时间的装饰器"""
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs) # 执行原函数
end = time.time()
print(f"{func.__name__} 耗时:{end - start:.4f}秒")
return result
return wrapper
# 使用装饰器
@timer # 等价于:slow_function = timer(slow_function)
def slow_function():
time.sleep(1)
print("函数执行完毕")
slow_function()
# 输出:
# 函数执行完毕
# slow_function 耗时:1.0012秒执行流程:
@timer把slow_function传给timertimer返回wrapper- 调用
slow_function()实际是调用wrapper() wrapper里先计时,再执行原函数,再计时,返回结果
带参数的装饰器
def repeat(n):
"""重复执行 n 次的装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
for i in range(n):
print(f"第 {i+1} 次执行:")
result = func(*args, **kwargs)
return result
return wrapper
return decorator
@repeat(3)
def greet(name):
print(f"你好,{name}!")
greet("张三")
# 输出3次"你好,张三!"常用内置装饰器
class MyClass:
@staticmethod
def static_method():
"""静态方法:与类和实例都无关,就是个普通函数放在类里"""
print("我是静态方法")
@classmethod
def class_method(cls):
"""类方法:第一个参数是类本身,可以访问类属性"""
print(f"我是类方法,属于 {cls}")
@property
def value(self):
"""属性装饰器:把方法变成属性访问"""
return self._value
@value.setter
def value(self, val):
"""设置属性时的验证"""
if val < 0:
raise ValueError("不能为负数")
self._value = val
# 使用
MyClass.static_method() # 不用实例化
MyClass.class_method() # 不用实例化
obj = MyClass()
obj.value = 100 # 像属性一样赋值,实际调用 setter
print(obj.value) # 像属性一样访问,实际调用 getter类装饰器
用类来实现装饰器,更灵活:
class CountCalls:
"""统计函数被调用次数"""
def __init__(self, func):
self.func = func
self.count = 0
def __call__(self, *args, **kwargs):
self.count += 1
print(f"{self.func.__name__} 被调用了 {self.count} 次")
return self.func(*args, **kwargs)
@CountCalls
def say_hello():
print("Hello!")
say_hello() # 次数:1
say_hello() # 次数:24) 上下文管理器:with 语句的秘密
自定义上下文管理器
我们已经知道 with open(...) as f: 好用,现在自己实现:
from contextlib import contextmanager
class DatabaseConnection:
"""模拟数据库连接"""
def __init__(self, config):
self.config = config
def __enter__(self):
"""进入 with 时执行"""
print(f"连接数据库:{self.config}")
self.conn = "连接对象" # 模拟连接
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出 with 时执行,无论是否异常"""
print("关闭数据库连接")
# 返回 True 表示异常已处理,不向外传播
# 返回 False 或不返回,异常会继续抛出
# 使用
with DatabaseConnection("localhost:3306") as conn:
print(f"使用 {conn} 执行查询")
# 输出:
# 连接数据库:localhost:3306
# 使用 连接对象 执行查询
# 关闭数据库连接用装饰器快速创建
from contextlib import contextmanager
@contextmanager
def managed_resource(name):
"""更简洁的写法"""
print(f"获取资源:{name}")
resource = f"资源-{name}"
try:
yield resource # 这里返回给 as 变量
finally:
print(f"释放资源:{name}")
# 使用
with managed_resource("数据库") as res:
print(f"使用 {res}")实际应用:计时、锁、事务等
from contextlib import contextmanager
import time
@contextmanager
def timer_context(name):
start = time.time()
yield
end = time.time()
print(f"{name} 耗时:{end - start:.4f}秒")
with timer_context("数据处理"):
time.sleep(1)
print("处理中...")5) 并发编程:让程序快如闪电
多线程(Threading):I/O 密集型任务
适合网络请求、文件读写等等待时间长的场景。
import threading
import time
def download(url):
print(f"开始下载 {url}")
time.sleep(2) # 模拟下载
print(f"{url} 下载完成")
urls = ["url1", "url2", "url3", "url4"]
# 串行下载:8秒
# for url in urls:
# download(url)
# 多线程下载:约2秒
threads = []
for url in urls:
t = threading.Thread(target=download, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join() # 等待所有线程完成
print("全部下载完成!")线程池(推荐):
from concurrent.futures import ThreadPoolExecutor
def fetch_data(url):
# 模拟网络请求
time.sleep(1)
return f"数据来自 {url}"
urls = ["api1", "api2", "api3", "api4", "api5"]
with ThreadPoolExecutor(max_workers=3) as executor: # 最多3个线程
results = executor.map(fetch_data, urls) # 自动分配任务
for result in results:
print(result)注意: Python 有 GIL(全局解释器锁),多线程不能真正并行计算,只是并发(交替执行)。
多进程(Multiprocessing):CPU 密集型任务
适合计算密集型,利用多核 CPU。
from multiprocessing import Pool
import os
def heavy_computation(n):
"""耗时计算"""
print(f"进程 {os.getpid()} 处理 {n}")
result = sum(i * i for i in range(n))
return result
numbers = [1000000, 2000000, 3000000, 4000000]
with Pool(processes=4) as pool: # 4个进程
results = pool.map(heavy_computation, numbers)
print(results)异步 IO(Asyncio):高性能并发
Python 3.4+ 引入,真正的异步非阻塞,性能极高。
import asyncio
async def fetch_data(url):
print(f"开始获取 {url}")
await asyncio.sleep(1) # 模拟异步等待,不会阻塞其他任务
print(f"完成 {url}")
return f"数据-{url}"
async def main():
# 创建多个任务并发执行
tasks = [
fetch_data("url1"),
fetch_data("url2"),
fetch_data("url3")
]
results = await asyncio.gather(*tasks) # 等待所有完成
print(results)
# 运行
asyncio.run(main())异步 Web 请求(aiohttp):
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["http://api1", "http://api2", "http://api3"]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"获取了 {len(results)} 个页面")
asyncio.run(main())选择指南:
| 场景 | 方案 | 原因 |
|---|---|---|
| 网络请求、文件 I/O | 多线程 / 异步 | 等待时间长,切换开销小 |
| 大量计算 | 多进程 | 绕过 GIL,利用多核 |
| 超高并发(万级) | 异步 IO | 资源占用最少,性能最高 |
6) 数据库操作:SQLite 与 SQLAlchemy
SQLite:零配置数据库
Python 内置,无需安装,适合小型应用。
import sqlite3
# 连接数据库(不存在则创建)
conn = sqlite3.connect('school.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
score REAL
)
''')
# 插入数据
cursor.execute("INSERT INTO students (name, age, score) VALUES (?, ?, ?)",
("张三", 20, 85.5))
cursor.execute("INSERT INTO students (name, age, score) VALUES (?, ?, ?)",
("李四", 21, 92.0))
# 查询
cursor.execute("SELECT * FROM students WHERE score > ?", (90,))
rows = cursor.fetchall()
for row in rows:
print(row) # (2, '李四', 21, 92.0)
# 提交并关闭
conn.commit()
conn.close()使用上下文管理器:
with sqlite3.connect('school.db') as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM students")
print(cursor.fetchall())
# 自动 commit 和 closeSQLAlchemy:ORM 神器
用 Python 代码操作数据库,不用写 SQL。
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'students'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
age = Column(Integer)
score = Column(Float)
# 连接数据库
engine = create_engine('sqlite:///school.db', echo=True)
Base.metadata.create_all(engine) # 创建表
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 增
new_student = Student(name="王五", age=22, score=88.5)
session.add(new_student)
session.commit()
# 查
students = session.query(Student).filter(Student.score > 85).all()
for s in students:
print(f"{s.name}: {s.score}")
# 改
student = session.query(Student).filter_by(name="张三").first()
student.score = 90.0
session.commit()
# 删
session.delete(student)
session.commit()
session.close()关系示例:一对多
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Class(Base):
__tablename__ = 'classes'
id = Column(Integer, primary_key=True)
name = Column(String(50))
students = relationship("Student", back_populates="class_")
class Student(Base):
__tablename__ = 'students'
id = Column(Integer, primary_key=True)
name = Column(String(50))
class_id = Column(Integer, ForeignKey('classes.id'))
class_ = relationship("Class", back_populates="students")
# 使用
cls = session.query(Class).first()
print(f"班级 {cls.name} 有 {len(cls.students)} 名学生")7) Web 开发入门:Flask 框架
快速开始
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route('/')
def home():
return "Hello, Flask!"
@app.route('/api/students', methods=['GET'])
def get_students():
students = [
{"id": 1, "name": "张三", "score": 85},
{"id": 2, "name": "李四", "score": 92}
]
return jsonify(students)
@app.route('/api/students', methods=['POST'])
def add_student():
data = request.get_json()
# 这里应该保存到数据库
return jsonify({"message": "添加成功", "data": data}), 201
@app.route('/api/students/<int:id>', methods=['GET'])
def get_student(id):
return jsonify({"id": id, "name": "张三", "score": 85})
if __name__ == '__main__':
app.run(debug=True, port=5000)运行:
python app.py
# 访问 http://localhost:5000/集成数据库的完整示例
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///students.db'
db = SQLAlchemy(app)
class Student(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), nullable=False)
age = db.Column(db.Integer)
score = db.Column(db.Float)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'age': self.age,
'score': self.score
}
# 初始化数据库
with app.app_context():
db.create_all()
@app.route('/api/students', methods=['GET'])
def list_students():
students = Student.query.all()
return jsonify([s.to_dict() for s in students])
@app.route('/api/students', methods=['POST'])
def create_student():
data = request.get_json()
student = Student(
name=data['name'],
age=data.get('age'),
score=data.get('score')
)
db.session.add(student)
db.session.commit()
return jsonify(student.to_dict()), 201
@app.route('/api/students/<int:id>', methods=['GET'])
def get_student(id):
student = Student.query.get_or_404(id)
return jsonify(student.to_dict())
@app.route('/api/students/<int:id>', methods=['PUT'])
def update_student(id):
student = Student.query.get_or_404(id)
data = request.get_json()
student.name = data.get('name', student.name)
student.age = data.get('age', student.age)
student.score = data.get('score', student.score)
db.session.commit()
return jsonify(student.to_dict())
@app.route('/api/students/<int:id>', methods=['DELETE'])
def delete_student(id):
student = Student.query.get_or_404(id)
db.session.delete(student)
db.session.commit()
return '', 204
if __name__ == '__main__':
app.run(debug=True)测试 API:
# 获取所有学生
curl http://localhost:5000/api/students
# 添加学生
curl -X POST -H "Content-Type: application/json" \
-d '{"name":"王五","age":22,"score":88}' \
http://localhost:5000/api/students
# 更新学生
curl -X PUT -H "Content-Type: application/json" \
-d '{"score":95}' \
http://localhost:5000/api/students/1
# 删除学生
curl -X DELETE http://localhost:5000/api/students/18) 实战项目:异步爬虫 + Web 展示
做一个天气数据爬虫,存储到数据库,用 Web 展示。
项目结构:
weather_app/
├── crawler.py # 异步爬虫
├── models.py # 数据库模型
├── app.py # Flask Web
└── templates/
└── index.html # 网页模板crawler.py:
import aiohttp
import asyncio
from datetime import datetime
async def fetch_weather(city):
"""模拟爬取天气(实际应调用天气 API)"""
async with aiohttp.ClientSession() as session:
# 这里应该是真实的天气 API
# async with session.get(f"https://api.weather.com/{city}") as resp:
# return await resp.json()
# 模拟数据
await asyncio.sleep(0.5) # 模拟网络延迟
return {
"city": city,
"temperature": 20 + hash(city) % 15, # 伪随机温度
"humidity": 40 + hash(city) % 40,
"updated_at": datetime.now().isoformat()
}
async def crawl_cities(cities):
"""并发爬取多个城市"""
tasks = [fetch_weather(city) for city in cities]
return await asyncio.gather(*tasks)
# 使用
if __name__ == "__main__":
cities = ["北京", "上海", "广州", "深圳", "杭州", "成都"]
results = asyncio.run(crawl_cities(cities))
for r in results:
print(r)models.py:
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class WeatherRecord(db.Model):
id = db.Column(db.Integer, primary_key=True)
city = db.Column(db.String(50), nullable=False)
temperature = db.Column(db.Float)
humidity = db.Column(db.Integer)
updated_at = db.Column(db.DateTime)
def to_dict(self):
return {
'city': self.city,
'temperature': self.temperature,
'humidity': self.humidity,
'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S')
}app.py:
from flask import Flask, render_template, jsonify
from models import db, WeatherRecord
from crawler import crawl_cities
import asyncio
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///weather.db'
db.init_app(app)
with app.app_context():
db.create_all()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/weather')
def get_weather():
records = WeatherRecord.query.order_by(WeatherRecord.updated_at.desc()).all()
return jsonify([r.to_dict() for r in records])
@app.route('/api/update', methods=['POST'])
def update_weather():
cities = ["北京", "上海", "广州", "深圳", "杭州"]
data = asyncio.run(crawl_cities(cities))
# 保存到数据库
for item in data:
record = WeatherRecord(
city=item['city'],
temperature=item['temperature'],
humidity=item['humidity'],
updated_at=datetime.fromisoformat(item['updated_at'])
)
db.session.add(record)
db.session.commit()
return jsonify({"message": "更新成功", "count": len(data)})
if __name__ == '__main__':
app.run(debug=True)templates/index.html:
<!DOCTYPE html>
<html>
<head>
<title>天气看板</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.card {
display: inline-block;
border: 1px solid #ddd;
border-radius: 8px;
padding: 20px;
margin: 10px;
min-width: 150px;
}
.city { font-size: 24px; font-weight: bold; }
.temp { font-size: 36px; color: #ff6b6b; }
button {
padding: 10px 20px;
font-size: 16px;
cursor: pointer;
background: #4ecdc4;
border: none;
border-radius: 4px;
color: white;
}
</style>
</head>
<body>
<h1>🌤️ 实时天气看板</h1>
<button onclick="updateWeather()">更新数据</button>
<div id="weather-container"></div>
<script>
async function loadWeather() {
const resp = await fetch('/api/weather');
const data = await resp.json();
const container = document.getElementById('weather-container');
container.innerHTML = data.map(w => `
<div class="card">
<div class="city">${w.city}</div>
<div class="temp">${w.temperature}°C</div>
<div>湿度: ${w.humidity}%</div>
<small>${w.updated_at}</small>
</div>
`).join('');
}
async function updateWeather() {
const btn = document.querySelector('button');
btn.textContent = '更新中...';
await fetch('/api/update', {method: 'POST'});
await loadWeather();
btn.textContent = '更新数据';
}
loadWeather();
</script>
</body>
</html>9) 下节课预告:王者到传说
下节课我们将学习:
- 测试驱动开发(TDD):pytest 单元测试
- 代码质量:类型提示、代码格式化、linting
- 部署上线:Docker 容器化、云服务器部署
- 性能优化:Profiling、缓存策略
- 设计模式:单例、工厂、观察者等
- 大型项目架构:MVC、分层架构、微服务入门
10) 学习建议
- 异步是难点:多写多练,理解事件循环机制
- 数据库要熟练:SQL 是基础,ORM 是效率
- Web 开发是必经之路:理解 HTTP、RESTful API
- 读框架源码:Flask、FastAPI 的源码并不复杂,学习优秀设计
- 做完整项目:从零开始做一个能用的东西,遇到问题解决问题
记住: 高阶特性是工具,不是炫技。选择合适的工具解决合适的问题,才是高手。
遇到问题? 百度一下,或者看官方文档。Python 的文档是业界标杆,非常详细。
祝编程愉快!May your code be bug-free! 🐍
课后作业:
- 用生成器重写文件读取功能,处理 10GB 日志文件
- 写个装饰器,自动重试失败的数据库操作(最多3次)
- 用 asyncio 并发请求 100 个 URL,统计成功率
- 完善天气应用:添加历史趋势图、城市搜索、邮件预警
下节课见!
发表评论
评论列表