从 Flask 切到 FastAPI 后,起飞了!

本文翻译自 Moving from Flask to FastAPI, 作者:Amal Shaji

刚好笔者这几天上手体验 FastAPI,感受到这个框架易用和方便。之前也使用过 Python 中的 Django 和 Flask 作为项目的框架。Django 说实话上手也方便,但是学习起来有点重量级框架的感觉,FastAPI 带给我的直观体验还是很轻便的,本文翻译的这篇文章就会着重介绍 FastAPI 和 Flask 的区别。

Python 是最流行的编程语言之一。从脚本到 API 开发再到机器学习,Python 都有着它自己的足迹。因为 Python 注重开发者的体验和其所能提供的大量工具而大受欢迎。网络框架 Flask 就是这样一个工具,它在机器学习社区中很受欢迎。它也被广泛用于 API开发。但是有一个新的框架正在崛起: FastAPI。与 Flask 不同,FastAPI 是一个 ASGI(Asynchronous Server Gateway Interface 异步服务器网关接口)框架。与 Go 和 NodeJS 一样,FastAPI 是最快的基于 Python 的 Web 框架之一。

本文针对那些有兴趣从 Flask 转移到 FastAPI 的人,比较和对比了 Flask 和 FastAPI 的常见模式。

# FastAPI vs Flask

FastAPI 的构建考虑了以下三个主要问题:

  • 速度
  • 开发者经验
  • 开放标准

你可以把 FastAPI 看作是把 Starlette、Pydantic、OpenAPI 和 JSON Schema 粘合在一起的胶水。

  • 本质上说,FastAPI 使用 Pydantic 进行数据验证,并使用 Starlette 作为工具,使其与 Flask 相比快得惊人,具有与 Node 或 Go 中的高速 Web APIs 相同的性能。
  • Starlette + Uvicorn 提供异步请求能力,这是 Flask 所缺乏的。
  • 有了 Pydantic 以及类型提示,你就可以得到一个具有自动完成功能的良好的编辑体验。你还可以得到数据验证、序列化和反序列化(用于构建一个 API),以及自动化文档(通过 JSON Schema 和 OpenAPI )。

也就是说,Flask 的使用更为广泛,所以它经过了实战检验,并且有更大的社区支持它。由于这两个框架都是用来扩展的,Flask 显然是赢家,因为它有庞大的插件生态系统。

建议:

  • 如果你对上述三个问题有共鸣,厌倦了 Flask 扩展时的大量选择,希望利用异步请求,或者只是想建立一个 RESTful API,请使用 FastAPI。
  • 如果你对 FastAPI 的成熟度不满意,需要用服务器端模板构建一个全栈应用,或者离不开一些社区维护的 Flask 扩展,就可以使用 Flask。

# 开始

 安装

与任何其他 Python 包一样,安装非常简单。

Flask

pip install flask

# or
poetry add flask
pipenv install flask
conda install flask

FastAPI

pip install fastapi uvicorn

# or
poetry add fastapi uvicorn
pipenv install fastapi uvicorn
conda install fastapi uvicorn -c conda-forge

与 Flask 不同,FastAPI 没有内置的开发服务器,因此需要像 Uvicorn 或 Daphne 这样的 ASGI 服务器。

 “Hello World” 应用

Flask

# flask_code.py

from flask import Flask

app = Flask(__name__)

@app.route("/")
def home():
    return {"Hello""World"}

if __name__ == "__main__":
    app.run()

FastAPI

# fastapi_code.py

import uvicorn
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def home():
    return {"Hello""World"}

if __name__ == "__main__":
    uvicorn.run("fastapi_code:app")

reload=True 这样的参数可以被传递到 uvicorn.run() 中,以实现开发时的热重载。

或者,您可以直接从终端启动服务器:

uvicorn run fastapi_code:app

热加载模式:

uvicorn run fastapi_code:app --reload

# 配置

Flask 和 FastAPI 都提供了许多选项来处理不同环境的不同配置。两者都支持以下模式:

  1. 环境变量
  2. 配置文件
  3. 实例文件夹
  4. 类和继承

有关更多信息,请参阅其各自的文档:

  • Flask: https://flask.palletsprojects.com/en/2.0.x/config/
  • FastAPI:https://fastapi.tiangolo.com/advanced/settings/

Flask

import os
from flask import Flask

class Config(object):
    MESSAGE = os.environ.get("MESSAGE")

app = Flask(__name__)
app.config.from_object(Config)

@app.route("/settings")
def get_settings():
    return { "message": app.config["MESSAGE"] }

if __name__ == "__main__":
    app.run()

现在,在你运行服务器之前,设置适当的环境变量:

export MESSAGE="hello, world"

FastAPI

import uvicorn
from fastapi import FastAPI
from pydantic import BaseSettings

class Settings(BaseSettings):
    message: str

settings = Settings()
app = FastAPI()

@app.get("/settings")
def get_settings():
    return { "message": settings.message }

if __name__ == "__main__":
    uvicorn.run("fastapi_code:app")

同样,在运行服务器之前,设置适当的环境变量:

export MESSAGE="hello, world"

# 路由, 模板和视图

 HTTP 方法

Flask

from flask import request

@app.route("/", methods=["GET", "POST"])
def home():
    # handle POST
    if request.method == "POST":
        return {"Hello""POST"}
    # handle GET
    return {"Hello""GET"}

FastAPI

@app.get("/")
def home():
    return {"Hello""GET"}

@app.post("/")
def home_post():
    return {"Hello""POST"}

FastAPI 为每个方法提供单独的装饰器:

@app.get("/")
@app.post("/")
@app.delete("/")
@app.patch("/")

 URL 参数

通过 URL(如 /employee/1 )传递信息以管理状态:

Flask

@app.route("/employee/<int:id>")
def home():
    return {"id": id}

FastAPI

@app.get("/employee/{id}")
def home(id: int):
    return {"id": id}

URL参数的指定类似于一个 f-string 表达式。此外,你还可以利用类型提示。这里,我们在运行时告诉 Pydantic, idint 类型的。在开发中,这也可以帮助完成更好的代码完成度。

 查询参数

与 URL 参数一样,查询参数(如 /employee?department=sales )也可用于管理状态(通常用于过滤或排序):

Flask

from flask import request

@app.route("/employee")
def home():
    department = request.args.get("department")
    return {"department": department}

FastAPI

@app.get("/employee")
def home(department: str):
    return {"department": department}

 模板

Flask

from flask import render_template

@app.route("/")
def home():
    return render_template("index.html")

默认情况下,Flask会在 “templates “文件夹中寻找模板。

FastAPI

你需要安装 Jinja:

pip install jinja2

实现:

from fastapi import Request
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse

app = FastAPI()

templates = Jinja2Templates(directory="templates")

@app.get("/", response_class=HTMLResponse)
def home(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

对于 FastAPI,你需要明确地定义 “模板 “文件夹。然后对于每个响应,需要提供请求上下文。

 静态文件

Flask

默认情况下,Flask 从“static”文件夹中提供静态文件。

FastAPI

在 FastAPI 中,需要为静态文件挂载一个文件夹:

from fastapi.staticfiles import StaticFiles

app = FastAPI()

app.mount("/static", StaticFiles(directory="static"), name="static")

 异步任务

Flask

从 Flask 2.0 开始,您可以使用 async/await 创建异步路由处理程序:

@app.route("/")
async def home():
    result = await some_async_task()
    return result

有关 Flask 中异步视图的更多信息,请查看 Flask 2.0 中的异步一文。

Flask 中的异步也可以通过使用线程(并发)或多处理(并行)或 Celery 或 RQ 等工具来实现:

  1. Asynchronous Tasks with Flask and Celery:https://testdriven.io/blog/flask-and-celery/
  2. Asynchronous Tasks with Flask and Redis Queue:https://testdriven.io/blog/asynchronous-tasks-with-flask-and-redis-queue/

FastAPI

由于 FastAPI 对 asyncio 的原生支持,它极大地简化了异步任务。要使用的话,只需在视图函数中添加 async 关键字:

@app.get("/")
async def home():
    result = await some_async_task()
    return result

FastAPI 还具有后台任务功能,您可以使用它来定义返回响应后要运行的后台任务。这对于不需要在发送回响应之前完成的操作很有用。

from fastapi import BackgroundTasks

def process_file(filename: str):
    # process file :: takes minimum 3 secs (just an example)
    pass

@app.post("/upload/{filename}")
async def upload_and_process(filename: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(process_file, filename)
    return {"message""processing file"}

在这里,响应将被即时发送,而不会让用户等待文件处理完成。

当你需要进行繁重的后台计算时,或者你需要一个任务队列来管理任务(tasks)和工作者(workers)时,你可能想使用Celery 而不是 BackgroundTasks。更多内容请参考 FastAPI 和 Celery 的异步任务:https://testdriven.io/blog/fastapi-and-celery/

 依赖注入

Flask

虽然你可以实现自己的依赖注入解决方案,但 Flask 默认没有真正的一流支持。相反,你需要使用一个外部包,如 flask-injector。

FastAPI

另一方面,FastAPI 具有处理依赖注入的强大解决方案。

例如:

from databases import Database
from fastapi import Depends
from starlette.requests import Request

from db_helpers import get_all_data
def get_db(request: Request):
    return request.app.state._db

@app.get("/data")
def get_data(db: Database = Depends(get_db)):
    return get_all_data(db)

因此,get_db 将获取对在应用程序的启动事件处理程序中创建的数据库连接的引用。 Depends 然后用于向 FastAPI 指示路由“依赖于” get_db。因此,它应该在路由处理程序中的代码之前执行,并且结果应该“注入”到路由本身。

 数据校验

Flask

Flask 没有任何内部数据验证支持。您可以使用功能强大的 Pydantic 包通过 Flask-Pydantic 进行数据验证。

FastAPI

FastAPI 如此强大的原因之一是它支持 Pydantic。

from pydantic import BaseModel

app = FastAPI()

class Request(BaseModel):
    username: str
    password: str

@app.post("/login")
async def login(req: Request):
    if req.username == "testdriven.io" and req.password == "testdriven.io":
        return {"message""success"}
    return {"message""Authentication Failed"}

在这里,我们接受一个模型 Request 的输入。该 payload 必须包含一个用户名和密码。

# correct payload format
✗ curl -X POST 'localhost:8000/login' 
    --header 'Content-Type: application/json' 
    --data-raw '{"username": "testdriven.io","password":"testdriven.io"}'

{"message":"success"}

# incorrect payload format
✗ curl -X POST 'localhost:8000/login' 
    --header 'Content-Type: application/json' 
    --data-raw '{"username": "testdriven.io","passwords":"testdriven.io"}'

{"detail":[{"loc":["body","password"],"msg":"field required","type":"value_error.missing"}]}

注意到这个请求。我们把密码 passwords 作为一个键而不是 password 传递进去。Pydantic 模型会自动告诉用户,password 字段是缺失的。

 序列化和反序列化

Flask

最简单的序列化方法是使用 jsonify:

from flask import jsonify
from data import get_data_as_dict

@app.route("/")
def send_data():
    return jsonify(get_data_as_dict)

对于复杂的对象,Flask 开发者经常使用 Flask-Marshmallow

FastAPI

FastAPI 自动序列化任何返回的字典 dict 。对于更复杂和结构化的数据,使用 Pydantic:

from pydantic import BaseModel

app = FastAPI()

class Request(BaseModel):
    username: str
    email: str
    password: str

class Response(BaseModel):
    username: str
    email: str

@app.post("/login", response_model=Response)
async def login(req: Request):
    if req.username == "testdriven.io" and req.password == "testdriven.io":
        return req
    return {"message""Authentication Failed"}

在这里,我们添加了一个包含三个输入的 Request 模型:用户名、电子邮件和密码。我们还定义了一个仅包含用户名和电子邮件的 Response 模型。输入 Request 模型处理反序列化,而输出 Response 模型处理对象序列化。然后通过 response_model 参数将响应模型传递给装饰器。

现在,如果我们将请求本身作为响应返回,Pydantic 将省略 password ,因为我们定义的响应模型不包含密码字段。

例如:

# output
✗ curl -X POST 'localhost:8000/login' 
    --header 'Content-Type: application/json' 
    --data-raw '{"username":"testdriven.io","email":"admin@testdriven.io","password":"testdriven.io"}'

{"username":"testdriven.io","email":"admin@testdriven.io"}

 中间件

中间件被用来在每个请求被视图功能处理之前应用逻辑。

Flask

class middleware:
    def __init__(self, app) -> None:
        self.app = app

    def __call__(self, environ, start_response):
        start = time.time()
        response = self.app(environ, start_response)
        end = time.time() - start
        print(f"request processed in {end} s")
        return response

app = Flask(__name__)
app.wsgi_app = middleware(app.wsgi_app)

FastAPI

from fastapi import Request

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    print(f"request processed in {process_time} s")
    return response

@app.middleware("http") 装饰器是在 FastAPI 中创建中间件的必备工具。上述中间件计算处理请求所花费的时间。视图函数处理请求后,计算总处理时间并将其作为响应头返回。

# flask output(logs)
request processed in 0.0010077953338623047 s
127.0.0.1 - - [22/Sep/2020 18:56:21"GET / HTTP/1.1" 200 -

# fastapi output(logs)
request processed in 0.0009925365447998047 s
INFO:     127.0.0.1:51123 - "GET / HTTP/1.1" 200 OK

 模块化

随着应用程序的发展,在某些时候你会想把类似的视图、模板、静态文件和模型组合在一起,以帮助把应用程序分解成更小的组件。

Flask

在 Flask 中,蓝图被用来实现模块化:

# blueprints/product/views.py
from flask import Blueprint

product = Blueprint("product", __name__)

@product.route("/product1")
    ...
# main.py

from blueprints.product.views import product

app.register_blueprint(product)

FastAPI

同时,在 FastAPI 中,模块化是通过 APIRouter 实现的:

# routers/product/views.py
from fastapi import APIRouter

product = APIRouter()

@product.get("/product1")
    ...
# main.py

from routers.product.views import product

app.include_router(product)

# 其他特点

 自动文档

Flask

Flask 不会自动创建开箱即用的 API 文档。然而,有几个扩展可以处理这个问题,比如 flask-swagger 和 Flask RESTX,但它们需要额外的设置。

FastAPI

默认情况下,FastAPI 支持 OpenAPI 以及 Swagger UI 和 ReDoc。这意味着每个端点都自动从与端点关联的元数据中记录下来。

所有注册的端点都列在这里

此处列出了所有已注册的端点

替代文档

 管理应用

Flask

Flask 有一个广泛使用的第三方管理包,称为 Flask-Admin,用于快速对您的模型执行 CRUD 操作。

FastAPI

截至目前,有两个流行的 FastAPI 扩展用于此:

  1. FastAPI Admin – 功能性管理面板,提供用于对数据执行 CRUD 操作的用户界面。
  2. SQLAlchemy Admin -FastAPI/Starlette 的管理面板,可与 SQLAlchemy 模型一起使用。

 身份认证

Flask

虽然 Flask 没有原生解决方案,但可以使用多个第三方扩展。

FastAPI

FastAPI 通过 fastapi.security 包原生支持许多安全和身份验证工具。通过几行代码,您可以将基本的 HTTP 身份验证添加到您的应用程序中:

import secrets

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials

app = FastAPI()

security = HTTPBasic()


def get_current_username(credentials: HTTPBasicCredentials = Depends(security)):
    correct_username = secrets.compare_digest(credentials.username, "stanleyjobson")
    correct_password = secrets.compare_digest(credentials.password, "swordfish")
    if not (correct_username and correct_password):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
    return credentials.username


@app.get("/whoami")
def who_ami_i(username: str = Depends(get_current_username)):
    return {"username": username}

FastAPI 通过 OpenAPI 标准实现 OAuth2 和 OpenID Connect。

查看官方文档中的以下资源以获取更多信息:

  1. Security Intro:https://fastapi.tiangolo.com/tutorial/security/
  2. Advanced Security:https://fastapi.tiangolo.com/advanced/security/

其他资源

  1. Web Authentication Methods Compared:https://testdriven.io/blog/web-authentication-methods/
  2. Adding Social Authentication to Flask:https://testdriven.io/blog/flask-social-auth/
  3. Session-based Auth with Flask for Single Page Apps:https://testdriven.io/blog/flask-spa-auth/
  4. Securing FastAPI with JWT Token-based Authentication:https://testdriven.io/blog/fastapi-jwt-auth/

 CORS

CORS(跨源资源共享)中间件检查请求是否来自允许的来源。如果是,则将请求传递给下一个中间件或视图函数。如果不是,它会拒绝请求,并将错误响应发送回调用者。

Flask Flask 需要一个名为 Flask-CORS 的外部包来支持 CORS:

pip install flask-cors

基本实现:

from flask_cors import CORS

app = Flask(__name__)

CORS(app)

FastAPI

FastAPI 原生支持 CORS:

from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

origins = ["*"]

app.add_middleware(CORSMiddleware, allow_origins=origins)

# 测试

Flask

import pytest
from flask import Flask

app = Flask(__name__)

@app.route("/")
def home():
    return {"message""OK"}

def test_hello():
    res = app.test_client().get("/")

    assert res.status_code == 200
    assert res.data == b'{"message":"OK"}n'

FastAPI

from fastapi import FastAPI
from fastapi.testclient import TestClient

app = FastAPI()

@app.get("/")
async def home():
    return {"message""OK"}

client = TestClient(app)

def test_home():
    res = client.get("/")

    assert res.status_code == 200
    assert res.json() == {"message""OK"}

FastAPI 提供了一个 TestClient。有了它,你可以直接用 FastAPI 运行 pytest。有关更多信息,请查看官方文档中的测试指南。

# 部署

 生产服务器

Flask

Flask 默认运行开发 WSGI(Web 服务器网关接口)应用程序服务器。对于生产环境,您需要使用生产级 WSGI 应用服务器,例如 Gunicorn、uWSGI 或 mod_wsgi

安装 Gunicorn:

pip install gunicorn

启动服务:

# main.py
# app = Flask(__name__)

gunicorn main:app

FastAPI

由于 FastAPI 没有开发服务器,您将使用 Uvicorn(或 Daphne)进行开发和生产。

安装 Uvicorn:

pip install uvicorn

启动服务:

python
# main.py
# app = FastAPI()

uvicorn main:app

您可能希望使用 Gunicorn 来管理 Uvicorn,以便同时利用并发性(通过 Uvicorn)和并行性(通过 Gunicorn worker):

# main.py
# app = FastAPI()

gunicorn -w 3 -k uvicorn.workers.UvicornWorker main:app

 Docker

Flask

FROM python3.10-slim

WORKDIR /app

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY . .

EXPOSE 5000

CMD ["gunicorn""main:app"]

这是 Flask 最简单的 Dockerfile 之一。要了解如何针对生产对其进行全面配置,请查看使用 Postgres、Gunicorn 和 Nginx 教程对 Flask 进行 Docker 化。

FastAPI

sql
FROM python3.10-slim

WORKDIR /app

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn""main:app"]

同样,这是一个非常简单的配置。 FastAPI 作者提供了几个生产就绪的 Dockerfile。有关更多信息,请查看官方 FastAPI 文档以及 Dockerizing FastAPI with Postgres、Uvicorn 和 Traefik 教程。

# 总结

退一步讲,Django 和 Flask 是两个最流行的基于 Python 的网络框架(FastAPI 是第三大流行框架)。不过它们(Django 和 Flask)的理念非常不同。Flask 比 Django 的优势在于 Flask 是一个微框架。程序结构由程序员自己决定,不强制执行。开发者可以在他们认为合适的时候添加第三方扩展来改进他们的代码。也就是说,通常情况下,随着代码库的增长,需要一些几乎所有网络应用都需要的通用功能。这些功能与框架的紧密结合,使得终端开发者需要自己创建和维护的代码大大减少。

本文中的代码实例也表达了同样的意思。换句话说,FastAPI 包括许多必要的功能。它还遵循严格的标准,使你的代码可以生产并更容易维护。FastAPI 的文档也非常完善。

虽然 FastAPI 可能不像 Flask 那样久经考验,但越来越多的开发人员正在转向它来提供机器学习模型或开发 RESTful API。切换到 FastAPI 是一个不错的选择。

# 官方文档

  • FastAPI:https://fastapi.tiangolo.com/
  • Flask:https://flask.palletsprojects.com/en/2.0.x/

# 其他资源

  • Porting Flask to FastAPI for ML Model Serving:https://www.pluralsight.com/tech-blog/porting-flask-to-fastapi-for-ml-model-serving/
  • Why we switched from Flask to FastAPI for production machine learning:https://towardsdatascience.com/why-we-switched-from-flask-to-fastapi-for-production-machine-learning-765aab9b3679
  • Awesome Flask:https://github.com/mjhea0/awesome-flask
  • Awesome FastAPI:https://github.com/mjhea0/awesome-fastapi

作者:宇宙之一粟
链接:https://juejin.cn/post/7219225476706140218

用 Python 制作各种用途的二维码!

转自:网络

当你提到二维码时,大多数人想到的是仓库管理或产品标签等 “工业 “应用,但这篇文章在很大程度上是关于二维码的个人和社会用途。

有趣的事实

二维(QR)码是在1994年发明的,最近几年由于新冠肺炎的出现,它的”非接触 “特性使其应用广泛。

二维码具备良好的解决方案。它可以被几乎所有的手机使用默认的照片应用程序扫描,同样,扫描它们也会根据它们的背景触发某种动作。

例如,一个含有URL的QR码允许你在浏览器中打开它。含有Wifi登录信息的二维码允许你立即连接。含有联系人详细信息的二维码允许您在地址簿中创建一个新的联系人。带有地理坐标的二维码允许你在地图上找到一个位置。这使扫描者的生活变得非常容易,并使信息提供者完全脱离……他们不再需要为了发生互动而在场。

开始

在开始之前,我做了一些研究,并选择了Python中的segno模块,因为它有全面的功能列表和漂亮的文档。在谷歌搜索Python中的QR码时,它并没有出现在最前面,甚至在模块名称中也没有 “QR”,但不要因此而放弃–它是一个很棒的工具。

让我们先用 .make()方法创建一个最简单的QR码。它只包含可以复制或传输的原始数据,由于内容很短, segno默认创建一个有趣的 “微型QR “码。

  1. pip install segno

  1. import segno

  2.  

  3. price_tag = segno.make("£9.99")

  4. price_tag.save("Price Tag.png")

 

用 Python 制作各种用途的二维码!

你也可以使用方便的 .show方法,而不是用 .save来创建一个文件,然后导航到它,显示它,使用后再删除它。这将创建一个临时图像文件,并在你的默认图像查看器中自动打开。这对调试或测试很有帮助,特别是当你开始试验不同的颜色和背景图像,并想确认QR码仍能正常扫描时。

用于分享URL的QR码

使用同样的方法和稍大的有效载荷,我第一个任务(分享视频信息)的Python代码是微不足道的。

  1. import segno

  2.  

  3. video = segno.make('https://youtu.be/px6FeOKD3Zc')

  4. video.save('Video.png', scale=4)

 

用 Python 制作各种用途的二维码!

只需多写一行代码,我就能创建一个更加丰富多彩的QR码,在这种情况下,我最喜欢的一张图片实际上是一个用Piet编程语言编写的 “Hello World “脚本。

  1. pip install qrcode-artistic

  1. import segno

  2.  

  3. piet = segno.make('https://esolangs.org/wiki/Piet', error='h')

  4. piet.to_artistic(background="background.png", target='Piet.png', scale=16)

用 Python 制作各种用途的二维码!

携带WIFI详细信息的QR码

我的第二个任务(WIFI登录细节)的Python代码也同样简单,但我定制了颜色并使输出更大。

  1. import segno

  2.  

  3. wifi_settings = {

  4. ssid='(Wifi Name)',

  5. password='(Wifi Password)',

  6. security='WPA',

  7. }

  8. wifi = segno.helpers.make_wifi(**wifi_settings)

  9. wifi.save("Wifi.png", dark="yellow", light="#323524", scale=8)

 

用 Python 制作各种用途的二维码!

联系信息的二维码

在这些快速成功的鼓励下,我决定为一个朋友的艺术和手工艺业务创建一个二维码。

  1. import segno

  2.  

  3. vcard = segno.helpers.make_vcard(

  4. name='Pxxx;Jxxx',

  5. displayname='Times Tables Furniture',

  6. email=('jxxxpxxx@timestables.furniture'),

  7. url=[

  8. 'https://www.etsy.com/uk/shop/TimesTablesFurniture',

  9. 'https://www.facebook.com/profile.php?id=100083448533180'

  10. ],

  11. phone="+44xxxxxxxxxx",

  12. )

  13.  

  14. img = vcard.to_pil(scale=6, dark="#FF7D92").rotate(45, expand=True)

  15. img.save('Etsy.png')

 

用 Python 制作各种用途的二维码!

对于我自己的VCard,我选择添加我公司的标志作为背景。

  1. import segno

  2.  

  3. awsom = segno.helpers.make_vcard(

  4. name='Fison;Pete',

  5. displayname='AWSOM Solutions Ltd.',

  6. email=('pxxxfxxx@awsom.solutions'),

  7. url=[

  8. 'https://twitter.com/awsom_solutions',

  9. 'https://medium.com/@petefison',

  10. 'https://github.com/pfython'

  11. ],

  12. phone="+44xxxxxxxxxx",

  13. )

  14.  

  15. awsom.to_artistic(

  16. background="logo.png",

  17. target='AWSOM.png',

  18. scale=6,

  19. quiet_zone="#D29500"

  20. )

 

用 Python 制作各种用途的二维码!

用于其他目的的二维码

segno API还允许你做以下事情。

segno.helpers.make_email : 发送一封预先准备好主题和内容的电子邮件。对于订阅新闻简报,或者从邮件服务器上触发任何可能的行动,都是非常好的。

segno.helpers.make_epc_qr: 发起一个电子支付。

segno.helpers.make_geo: 在一个特定的经度和纬度打开默认的地图应用。

segno.make_sequence : 使用 “结构化附加 “模式创建一个QR码序列。

把所有东西都保存在内存中

如果你喜欢把所有的处理保持在 “内存中”,而不是在硬盘或服务器上创建文件,你可以创建一个PIL图像对象,或者使用BytesIO保存一个类似文件的对象:

  1. import segno

  2.  

  3. beatle = segno.make('Paul McCartney')

  4. beatle = qrcode.to_pil()

  1. import segno

  2. import io

  3.  

  4. beatle = segno.make('Paul McCartney')

  5. buff = io.BytesIO()

  6. beatle.save(buff, kind='svg')

同样,如果你喜欢直接从URL中加载背景图片到内存中,而不是先在硬盘或服务器上创建一个文件,你可以使用urlopen方法。

  1. from urllib.request import urlopen

  2. import segno

  3.  

  4. beatle = segno.make('Ringo Starr', error='h')

  5. url = 'https://media.giphy.com/media/HNo1tVKdFaoco/giphy.gif'

  6. bg_file = urlopen(url)

  7. beatle.to_artistic(background=bg_file, target='ringo.gif', scale=10)

二维码的创造性、家用式的想法

希望这篇短文能让您对使用二维码有兴趣,不仅仅是用于 “工业 “项目,而且还用于个人和社会项目。网上有很多文章,建议将二维码创造性地用于商业和营销,因此,在本文的最后,我想分享一些我自己的 “家用式 “想法,可能会吸引你。

  • 在你的垃圾桶边上有关于回收规则的信息

  • 触发一封电子邮件给亲人,说你已经安全到家。

  • 触发一个更新,说你已经离开了家。

  • 在你所在的城镇或乡村道路上寻宝;链接到你自己的网站,包括当地信息、社会媒体团体、当前的地理位置等。

  • 在你的房子周围为年幼的孩子寻宝,或举行晚宴。

  • 在明信片上贴上二维码,让家人和朋友直接进入你的旅行日记、照片日记或博客中的最新条目。

  • 洗衣机、微波炉、烤箱、打印机、锅炉、3D打印机、激光切割器、甚至汽车等电器的说明书。

  • 你的家谱或历史,或财产信息保存起来供后人参考。

  • 一个在线留言簿,游客可以记录他们的逗留,并留下个人的信息。

  • 冰箱上的贴纸,链接到最新的家庭购物清单。

  • 每个家庭成员的每周家务事清单。

  • 笔记本电脑、电话、相机、无人机等的 “如果丢失,请归还…… “贴纸。

  • 诚信箱–让人们在使用/消费/购买东西时付款,例如,从共享冰箱中的食物和饮料,在农场外出售的鸡蛋。

  • 预约管理电视/互联网/游戏的特权。

  • 婴儿保姆或宠物保姆的紧急联系信息。

  • 在停电的情况下为你提供紧急联络方式–水、电、煤气。

  • 为你看家护院的人提供当地的食品配送公司。

  • 个人视频信息/提醒。

  • 关于你最喜欢的装饰品或房子周围的艺术品的信息。

  • 你的酒架/酒窖的品酒说明。

  • 花园植物和树木的标签–物种、浇水、年龄等细节。

 

 

– EOF –

转自:https://mp.weixin.qq.com/s/cJ-_VT3jYxRYCjA5aqH-ZA

监控 Python 内存使用情况和代码执行时间

我的代码的哪些部分运行时间最长、内存最多?我怎样才能找到需要改进的地方?”

在开发过程中,我很确定我们大多数人都会想知道这一点,在本文中总结了一些方法来监控 Python 代码的时间和内存使用情况。

本文将介绍4种方法,前3种方法提供时间信息,第4个方法可以获得内存使用情况。

  • time 模块
  • %%time 魔法命令
  • line_profiler
  • memory_profiler

 1. time 模块

这是计算代码运行所需时间的最简单、最直接(但需要手动开发)的方法。他的逻辑也很简单:记录代码运行之前和之后的时间,计算时间之间的差异。这可以实现如下:

 import time

 start_time = time.time()
 result = 5+2
 end_time = time.time()

 print('Time taken = {} sec'.format(end_time - start_time))

下面的例子显示了for循环和列表推导式在时间上的差异:

 import time

 # for loop vs. list comp
 list_comp_start_time = time.time()
 result = [i for i in range(0,1000000)]
 list_comp_end_time = time.time()
 print('Time taken for list comp = {} sec'.format(list_comp_end_time - list_comp_start_time))

 result=[]
 for_loop_start_time = time.time()
 for i in range(0,1000000):
     result.append(i)
 for_loop_end_time = time.time()
 print('Time taken for for-loop = {} sec'.format(for_loop_end_time - for_loop_start_time))

 list_comp_time = list_comp_end_time - list_comp_start_time
 for_loop_time = for_loop_end_time - for_loop_start_time
 print('Difference = {} %'.format((for_loop_time - list_comp_time)/list_comp_time * 100))

我们都知道for会慢一些

 Time taken for list comp = 0.05843973159790039 sec
 Time taken for for-loop = 0.06774497032165527 sec
 Difference = 15.922795107582594 %

 2. %%time 魔法命令

魔法命令是IPython内核中内置的方便命令,可以方便地执行特定的任务。一般情况下都实在jupyter notebook种使用。

在单元格的开头添加%%time ,单元格执行完成后,会输出单元格执行所花费的时间。

 %%time
 def convert_cms(cm, unit='m'):
     '''
    Function to convert cm to m or feet
    '''
     if unit == 'm':
         return cm/100
     return cm/30.48

 convert_cms(1000)

结果如下:

 CPU timesuser 24 µssys: 1 µstotal: 25 µs
 Wall time: 28.1 µs

 Out[8]: 10.0

这里的CPU times是CPU处理代码所花费的实际时间,Wall time是事件经过的真实时间,在方法入口和方法出口之间的时间。

 3. line_profiler

前两个方法只提供执行该方法所需的总时间。通过时间分析器我们可以获得函数中每一个代码的运行时间。

这里我们需要使用line_profiler包。使用pip install line_profiler。

 import line_profiler

 def convert_cms(cm, unit='m'):
     '''
    Function to convert cm to m or feet
    '''
     if unit == 'm':
         return cm/100
     return cm/30.48

 # Load the profiler
 %load_ext line_profiler

 # Use the profiler's magic to call the method
 %lprun -f convert_cms convert_cms(1000'f')

输出结果如下:

 Timer unit: 1e-06 s

 Total time: 4e-06 s
 File: /var/folders/y_/ff7_m0c146ddrr_mctd4vpkh0000gn/T/ipykernel_22452/382784489.py
 Function: convert_cms at line 1

 Line #     Hits         Time Per Hit   % Time Line Contents
 ==============================================================
      1                                           def convert_cms(cm, unit='m'):
      2                                               '''
      3                                               Function to convert cm to m or feet
      4                                               '''
      5         1         2.0     2.0     50.0     if unit == 'm':
      6                                                   return cm/100
      7         1         2.0     2.0     50.0     return cm/30.48

可以看到line_profiler提供了每行代码所花费时间的详细信息。

  • Line Contents :运行的代码
  • Hits:行被执行的次数
  • Time:所花费的总时间(即命中次数x每次命中次数)
  • Per Hit:一次执行花费的时间,也就是说 Time =  Hits X Per Hit
  • % Time:占总时间的比例

可以看到,每一行代码都详细的分析了时间,这对于我们分析时间相当的有帮助。

 4. memory_profiler

与line_profiler类似,memory_profiler提供代码的逐行内存使用情况。

要安装它需要使用pip install memory_profiler。我们这里监视convert_cms_f函数的内存使用情况

 from conversions import convert_cms_f
 import memory_profiler

 %load_ext memory_profiler

 %mprun -f convert_cms_f convert_cms_f(1000'f')

convert_cms_f函数在单独的文件中定义,然后导入。结果如下:

 Line #   Mem usage   Increment Occurrences   Line Contents
 =============================================================
      1     63.7 MiB     63.7 MiB           1   def convert_cms_f(cm, unit='m'):
      2                                             '''
      3                                             Function to convert cm to m or feet
      4                                             '''
      5     63.7 MiB     0.0 MiB           1       if unit == 'm':
      6                                                 return cm/100
      7     63.7 MiB     0.0 MiB           1       return cm/30.48memory_profiler 提供对每行代码内存使用情况的详细了解。

这里的1 MiB (MebiByte) 几乎等于 1MB。1 MiB  = 1.048576 1MB

但是memory_profiler 也有一些缺点:它通过查询操作系统内存,所以结果可能与 python 解释器略有不同,如果在会话中多次运行 %mprun,可能会注意到增量列报告所有代码行为 0.0 MiB。这是因为魔法命令的限制导致的。

虽然memory_profiler有一些问题,但是它就使我们能够清楚地了解内存使用情况,对于开发来说是一个非常好用的工具

 5. 总结一下

虽然Python并不是一个以执行效率见长的语言,但是在某些特殊情况下这些命令对我们还是非常有帮助的。

转自:https://mp.weixin.qq.com/s/oKm5oDVsXxX8UZd7FUQaFg

抓取速度提升 3 倍!Python 的这个内置库你用上了吗?

从网站中抓取数据是开发者的一个典型“用例”。无论它是属于副业项目,还是你正在成立一个初创公司,抓取数据似乎都很有必要。

举个例子,倘若您想要创建一个比价网站,那么您会需要从各种电商网站上抓取价格信息;或者您想要构建一个可以识别商品并在亚马逊上自动查找价格的“人工智能”。类似的场景还有很多。

但是您有没有注意到,获取所有页面信息的速度有多慢呢?您会选择一个接一个地去抓取商品吗?应该会有更好的解决方案吧?答案是肯定的。

抓取网页可能非常耗时,因为您必须花时间等待服务器响应,抑或是速率受限。这就是为什么我们要向您展示如何通过在 Python 中使用并发来加速您的网页数据抓取项目

前提

为了使代码正常运行,您需要安装 python 3[1]。部分系统可能已经预装了它。然后您还需要使用 pip install 安装所有必要的库。

pip install requests beautifulsoup4 aiohttp numpy

如果您了解并发背后的基础知识,可以跳过理论部分直接进入实际操作环节。

并发

并发是一个术语,用于描述同时运行多个计算任务的能力。

当您按顺序向网站发出请求时,您可以选择一次发出一个请求并等待结果返回,然后再发出下一个请求。

不过,您也可以同时发送多个请求,并在它们返回时处理对应的结果,这种方式的速度提升效果是非常显著的。与顺序请求相比,并发请求无论是否并行运行(多个 CPU),都会比前者快得多 — 稍后会详细介绍。

要理解并发的优势。我们需要了解顺序处理和并发处理任务之间的区别。假设我们有五个任务,每个任务需要 10 秒才能完成。当按顺序处理它们时,完成五个任务所需的时间为 50 秒;而并发处理时,仅需要 10 秒即可完成。

抓取速度提升 3 倍!Python 的这个内置库你用上了吗?

除了提高处理速度之外,并发还允许我们通过将网页抓取任务负载分布于多个进程中,来实现在更短的时间内完成更多的工作。

这里有几种实现并行化请求的方式:例如 multiprocessingasyncio。从网页抓取的角度来看,我们可以使用这些库来并行处理对不同网站或同一网站不同页面的请求。在本文中,我们将重点关注 asyncio,这是一个 Python 内置的模块,它提供了使用协程编写单线程并发代码的基础设施。

由于并发意味着更复杂的系统和代码,因此在使用前请考虑在您的使用场景中是否利大于弊。

并发的优势

  • 在更短的时间内完成更多的工作
  • 可以将空闲的网络时间投入到其他请求中

并发的危险之处

  • 更不易于开发和调试
  • 可能存在竞争条件
  • 需要检查并使用线程安全的函数
  • 一不小心就会增加程序阻塞的概率
  • 并发自带系统开销,因此需要设置合理的并发级别
  • 针对小型站点请求过多的话,可能会变成 DDoS 攻击

抓取速度提升 3 倍!Python 的这个内置库你用上了吗?

 

*同时释放所有请求时要小心*

 

为何选择 asyncio

在做出选择之前,我们有必要了解一下 asynciomultiprocessing 之间的区别,以及 IO 密集型与 CPU 密集型之间的区别。

asyncio[2] “是一个使用 async/await 语法编写并发代码的库”,它在单个处理器上运行。

multiprocessing[3] “是一个支持使用 API 生产进程的包 […] 允许程序员充分利用给定机器上的多个处理器”。每个进程将在不同的 CPU 中启动自己的 Python 解释器。

IO 密集型意味着程序将受 I/O 影响而变得运行缓慢。在我们的案例中,主要指的是网络请求。

CPU 密集型意味着程序会由于 CPU 计算压力导致运行缓慢 — 例如数学计算。

为什么这会影响我们选择用于并发的库?因为并发成本的很大一部分是创建和维护线程/进程。对于 CPU 密集型问题,在不同的 CPU 中拥有多个进程将会提升效率。但对于 I/O 密集型的场景,情况可能并非如此。

由于网页数据抓取主要受 I/O 限制,因此我们选择了 asyncio。但如果有疑问(或只是为了好玩),您可以使用 multiprocessing 尝试这个场景并比较一下结果。

抓取速度提升 3 倍!Python 的这个内置库你用上了吗?

顺序实现的版本

我们将从抓取 scrapeme.live 作为示例开始,这是一个专门用于测试的电子商务网站。

首先,我们将从顺序抓取的版本开始。以下几个片段是所有案例的一部分,因此它们将保持不变。

通过访问目标主页,我们发现它有 48 个子页面。由于是测试环境,这些子页面不会很快发生变化,我们会使用到以下两个常量:

base_url = "https://scrapeme.live/shop/page" 
pages = range(149# max page (48) + 1

现在,从目标产品中提取基础数据。为此,我们使用 requests.get 获取 HTML 内容,然后使用 BeautifulSoup 解析它。我们将遍历每个产品并从中获取一些基本信息。所有选择器都来自对内容的手动审查(使用 DevTools),但为简洁起见,我们不会在这里详细介绍。

import requests 
from bs4 import BeautifulSoup 
 
def extract_details(page): 
 # concatenate page number to base URL 
 response = requests.get(f"{base_url}/{page}/"
 soup = BeautifulSoup(response.text, "html.parser"
 
 pokemon_list = [] 
 for pokemon in soup.select(".product"): # loop each product 
  pokemon_list.append({ 
   "id": pokemon.find(class_="add_to_cart_button").get("data-product_id"), 
   "name": pokemon.find("h2").text.strip(), 
   "price": pokemon.find(class_="price").text.strip(), 
   "url": pokemon.find(class_="woocommerce-loop-product__link").get("href"), 
  }) 
 return pokemon_list

extract_details 函数将获取一个页码并将其连接起来,用于创建子页面的 URL。获取内容并创建产品数组后返回。这意味着返回的值将是一个字典列表,这是一个后续使用的必要细节。

我们需要为每个页面运行上面的函数,获取所有结果,并存储它们。

import csv 
 
# modified to avoid running all the pages unintentionally 
pages = range(13
 
def store_results(list_of_lists): 
 pokemon_list = sum(list_of_lists, []) # flatten lists 
 
 with open("pokemon.csv""w"as pokemon_file: 
  # get dictionary keys for the CSV header 
  fieldnames = pokemon_list[0].keys() 
  file_writer = csv.DictWriter(pokemon_file, fieldnames=fieldnames) 
  file_writer.writeheader() 
  file_writer.writerows(pokemon_list) 
 
list_of_lists = [ 
 extract_details(page) 
 for page in pages 

store_results(list_of_lists)

运行上面的代码将获得两个产品页面,提取产品(总共 32 个),并将它们存储在一个名为 pokemon.csv 的 CSV 文件中。 store_results 函数不影响顺序或并行模式下的抓取。你可以跳过它。

由于结果是列表,我们必须将它们展平以允许 writerows 完成其工作。这就是为什么我们将变量命名为list_of_lists(即使它有点奇怪),只是为了提醒大家它不是扁平的。

输出 CSV 文件的示例:

id name price url
759 Bulbasaur £63.00 https://scrapeme.live/shop/Bulbasaur/
729 Ivysaur £87.00 https://scrapeme.live/shop/Ivysaur/
730 Venusaur £105.00 https://scrapeme.live/shop/Venusaur/
731 Charmander £48.00 https://scrapeme.live/shop/Charmander/
732 Charmeleon £165.00 https://scrapeme.live/shop/Charmeleon/

如果您要为每个页面 (48) 运行脚本,它将生成一个包含 755 个产品的 CSV 文件,并花费大约 30 秒。

time python script.py 
 
real 0m31,806s 
user 0m1,936s 
sys 0m0,073s

asyncio 介绍

我们知道我们可以做得更好。如果我们同时执行所有请求,它应该花费更少时间,对吧?也许会和执行最慢的请求所花费的时间相等。

并发确实应该运行得更快,但它也涉及一些开销。所以这不是线性的数学改进。

为此,我们将使用上面提到的 asyncio。它允许我们在事件循环中的同一个线程上运行多个任务(就像 Javascript 一样)。它将运行一个函数,并在运行时允许时将上下文切换到不同的上下文。在我们的例子中,HTTP 请求允许这种切换。

我们将开始看到一个 sleep 一秒钟的示例。并且脚本应该需要一秒钟才能运行。请注意,我们不能直接调用 main。我们需要让 asyncio 知道它是一个需要执行的异步函数。

import asyncio 
 
async def main(): 
 print("Hello ..."
 await asyncio.sleep(1
 print("... World!"
 
asyncio.run(main())
time python script.py 
Hello ... 
... World! 
 
real 0m1,054s 
user 0m0,045s 
sys 0m0,008s

简单的并行代码

接下来,我们将扩展一个示例案例来运行一百个函数。它们每个都会 sleep 一秒钟并打印一个文本。如果我们按顺序运行它们大约需要一百秒。使用 asyncio,只需要一秒!

这就是并发背后的力量。如前所述,对于纯 I/O 密集型任务,它将执行得更快 – sleep 不是,但它对示例很重要。

我们需要创建一个辅助函数,它会 sleep 一秒钟并打印一条消息。然后,我们编辑 main 以调用该函数一百次,并将每个调用存储在一个任务列表中。最后也是关键的部分是执行并等待所有任务完成。这就是 asyncio.gather[4] 所做的事情。

import asyncio 
 
async def demo_function(i): 
 await asyncio.sleep(1
 print(f"Hello {i}"
 
async def main(): 
 tasks = [ 
  demo_function(i) 
  for i in range(0100
 ] 
 await asyncio.gather(*tasks) 
 
asyncio.run(main())

正如预期的那样,一百条消息和一秒钟的执行时间。完美!

使用 asyncio 进行抓取

我们需要将这些知识应用于数据抓取。遵循的方法是同时请求并返回产品列表,并在所有请求完成后存储它们。每次请求后或者分批保存数据可能会更好,以避免实际情况下的数据丢失。

我们的第一次尝试不会有并发限制,所以使用时要小心。在使用数千个 URL 运行它的情况下……好吧,它几乎会同时执行所有这些请求。这可能会给服务器带来巨大的负载,并可能会损害您的计算机。

requests 不支持开箱即用的异步,因此我们将使用 aiohttp [5] 来避免复杂化。 requests 可以完成这项工作,并且没有实质性的性能差异。但是使用 aiohttp 代码更具可读性。

import asyncio 
import aiohttp 
from bs4 import BeautifulSoup 
 
async def extract_details(page, session): 
 # similar to requests.get but with a different syntax 
 async with session.get(f"{base_url}/{page}/"as response: 
 
  # notice that we must await the .text() function 
  soup = BeautifulSoup(await response.text(), "html.parser"
 
  # [...] same as before 
  return pokemon_list 
 
async def main(): 
 # create an aiohttp session and pass it to each function execution 
 async with aiohttp.ClientSession() as session: 
  tasks = [ 
   extract_details(page, session) 
   for page in pages 
  ] 
  list_of_lists = await asyncio.gather(*tasks) 
  store_results(list_of_lists) 
 
asyncio.run(main())

CSV 文件应该像以前一样包含每个产品的信息 (共 755 个)。由于我们同时执行所有页面调用,结果不会按顺序到达。如果我们将结果添加到 extract_details 内的文件中,它们可能是无序的。但我们会等待所有任务完成然后处理它们,因此顺序性不会有太大影响。

time python script.py 
 
real 0m11,442s 
user 0m1,332s 
sys 0m0,060s

我们做到了!速度提升了 3 倍,但是……不应该是 40 倍吗?没那么简单。许多因素都会影响性能(网络、CPU、RAM 等)。

在这个演示页面中,我们注意到当执行多个调用时,响应时间会变慢,这可能是设计使然。一些服务器/提供商可以限制并发请求的数量,以避免来自同一 IP 的过多流量。它不是一种阻塞,而是一个队列。你会得到服务响应,但需要稍等片刻。

要查看真正的加速,您可以针对延迟[6]页面进行测试。这是另一个测试页面,它将等待 2 秒然后返回响应。

base_url = "https://httpbin.org/delay/2" 
#... 
 
async def extract_details(page, session): 
 async with session.get(base_url) as response: 
  #...

这里去掉了所有的提取和存储逻辑,只调用了延迟 URL 48 次,并在 3 秒内运行完毕。

time python script.py 
 
real 0m2,865s 
user 0m0,245s 
sys 0m0,031s

使用信号量限制并发

如上所述,我们应该限制并发请求的数量,尤其是针对单个域名。

asyncio 带有 Semaphore[7],一个将获取和释放锁的对象。它的内部功能将阻塞一些调用,直到获得锁,从而创建最大的并发性。

我们需要创建尽可能最大值的信号量。然后等待提取函数运行,直到 async with sem 可用。

max_concurrency = 3 
sem = asyncio.Semaphore(max_concurrency) 
 
async def extract_details(page, session): 
 async with sem: # semaphore limits num of simultaneous downloads 
  async with session.get(f"{base_url}/{page}/"as response: 
   # ... 
 
async def main(): 
  # ... 
 
loop = asyncio.get_event_loop() 
loop.run_until_complete(main())

它完成了工作,并且相对容易实现!这是最大并发设置为 3 的输出。

time python script.py 
 
real 0m13,062s 
user 0m1,455s 
sys 0m0,047s

这表明无限并发的版本并没有全速运行。如果我们将限制增加到 10,总时间与未限制的脚本运行时间相近。

使用 TCPConnector 限制并发

aiohttp 提供了一种替代解决方案,可提供进一步的配置。我们可以创建传入自定义 TCPConnector[8] 的客户端会话。

我们可以使用两个适合我们需求的参数来构建它:

  • limit – “同时连接的总数”。
  • limit_per_host – “限制同时连接到同一端点的连接数”(同一主机、端口和 is_ssl)。
max_concurrency = 10 
max_concurrency_per_host = 3 
 
async def main(): 
 connector = aiohttp.TCPConnector(limit=max_concurrency, limit_per_host=max_concurrency_per_host) 
 async with aiohttp.ClientSession(connector=connector) as session: 
  # ... 
 
asyncio.run(main())

这种写法也易于实施和维护!这是每个主机最大并发设置为 3 的输出。

time python script.py 
 
real 0m16,188s 
user 0m1,311s 
sys 0m0,065s

Semaphore 相比的优势是可以选择限制每个域的并发调用和请求的总量。我们可以使用同一个会话来抓取不同的站点,每个站点都有自己的限制。

缺点是它看起来有点慢。需要针对真实案例,使用更多页面和实际数据运行一些测试。

multiprocessing

就像我们之前看到的那样,数据抓取是 I/O 密集型的。但是,如果我们需要将它与一些 CPU 密集型计算混合怎么办?为了测试这种情况,我们将使用一个函数,该函数将在每个抓取的页面之后 count_a_lot。这是强制 CPU 忙碌一段时间的简单(且有些愚蠢)的方法。

def count_a_lot(): 
 count_to = 100_000_000 
 counter = 0 
 while counter < count_to: 
  counter = counter + 1 
 
async def extract_details(page, session): 
 async with session.get(f"{base_url}/{page}/"as response: 
  # ... 
  count_a_lot() 
  return pokemon_list

对于 asyncio 版本,只需像以前一样运行它。可能需要很长时间⏳。

time python script.py 
 
real 2m37,827s 
user 2m35,586s 
sys 0m0,244s

现在,比较难理解的部分来了:

直接引入 multiprocessing 看起来有点困难。实际上,我们需要创建一个 ProcessPoolExecutor,它能够“使用一个进程池来异步执行调用”。它将处理不同 CPU 中每个进程的创建和控制

但它不会分配负载。为此,我们将使用 NumPyarray_split,它会根据 CPU 的数量将页面范围分割成相等的块。

main 函数的其余部分类似于 asyncio 版本,但更改了一些语法以匹配 multiprocessing 的语法风格。

此处的本质区别是我们不会直接调用extract_details。实际上是可以的,但我们将尝试通过将 multiprocessingasyncio 混合使用来获得最好的执行效率。

from concurrent.futures import ProcessPoolExecutor 
from multiprocessing import cpu_count 
import numpy as np 
 
num_cores = cpu_count() # number of CPU cores 
 
def main(): 
 executor = ProcessPoolExecutor(max_workers=num_cores) 
 tasks = [ 
  executor.submit(asyncio_wrapper, pages_for_task) 
  for pages_for_task in np.array_split(pages, num_cores) 
 ] 
 doneTasks, _ = concurrent.futures.wait(tasks) 
 
 results = [ 
  item.result() 
  for item in doneTasks 
 ] 
 store_results(results) 
 
main()

长话短说,每个 CPU 进程都会有几页需要抓取。一共有 48 个页面,假设你的机器有 8 个 CPU,每个进程将请求 6 个页面(6 * 8 = 48)。

这六个页面将同时运行!之后,计算将不得不等待,因为它们是 CPU 密集型的。但是我们有很多 CPU,所以它们应该比纯 asyncio 版本运行得更快。

async def extract_details_task(pages_for_task): 
 async with aiohttp.ClientSession() as session: 
  tasks = [ 
   extract_details(page, session) 
   for page in pages_for_task 
  ] 
  list_of_lists = await asyncio.gather(*tasks) 
  return sum(list_of_lists, []) 
 
 
def asyncio_wrapper(pages_for_task): 
 return asyncio.run(extract_details_task(pages_for_task))

这就是神奇的地方。每个 CPU 进程将使用页面的子集启动一个 asyncio(例如,第一个页面从 1 到 6)。

然后,每一个都将调用几个 URL,使用已知的 extract_details 函数。

上述内容需要花点时间来吸收它。整个过程是这样的:

  1. 创建执行器
  2. 拆分页面
  3. 每个进程启动 asyncio
  4. 创建一个 aiohttp 会话并创建页面子集的任务
  5. 提取每一页的数据
  6. 合并并存储结果

下面是本次的执行时间。虽然之前我们没有提到它,但这里的 user 时间却很显眼。对于仅运行 asyncio 的脚本:

time python script.py 
 
real 2m37,827s 
user 2m35,586s 
sys 0m0,244s

具有 asyncio 和多个进程的版本:

time python script.py 
 
real 0m38,048s 
user 3m3,147s 
sys 0m0,532s

发现区别了吗?实际运行时间方面第一个用了两分钟多,第二个用了 40 秒。但是在总 CPU 时间(user 时间)中,第二个超过了三分钟!看起来系统开销的耗时确实有点多。

这表明并行处理“浪费”了更多时间,但程序是提前完成的。显然,您在决定选择哪种方法时,需要考虑到开发和调试的复杂度。

结论

我们已经看到 asyncio 足以用于抓取,因为大部分运行时间都用于网络请求,这种场景属于 I/O 密集型并且适用于单核中的并发处理。

如果收集的数据需要一些 CPU 密集型工作,这种情况就会改变。虽然有关计数的例子有一点愚蠢,但至少你理解了这种场景。

在大多数情况下,带有 aiohttpasyncio 比异步的 requests 更适合完成目标工作。同时我们可以添加自定义连接器以限制每个域名的请求数、并发请求总数。有了这三个部分,您就可以开始构建一个可以扩展的数据抓取程序了。

另一个重要的部分是允许新的 URL/任务加入程序运行(类似于队列),但这是另一篇文章的内容。敬请关注!

参考资料

[1]

 

python 3: https://www.python.org/downloads/

[2]

asyncio: https://docs.python.org/3/library/asyncio.html

[3]

multiprocessing: https://docs.python.org/3/library/multiprocessing.html

[4]

asyncio.gather: https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

[5]

aiohttp: https://docs.aiohttp.org/

[6]

延迟: https://httpbin.org/delay/2

[7]

Semaphore: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Semaphore

[8]

TCPConnector: https://docs.aiohttp.org/en/stable/client_reference.html#tcpconnector

[9]

参考原文: https://www.zenrows.com/blog/speed-up-web-scraping-with-concurrency-in-python

 

 

– EOF –

 

转自:https://mp.weixin.qq.com/s/pPpc7CH4Rr9WTa4p8ue2Iw

python-office库

1行代码,生成动漫头像

听说某宝需要50块钱一张?别再去交智商税了!

1. 安装python-office

安装很简单,在有python环境的电脑上,只需要执行下面这一行命令。

“ 如果你之前使用过python-office这个库,也需要执行一下,可以下载到最新版本~

安装

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple python-office -U

如果你的电脑里还没有安装python环境,可以看一下下面这个6分钟的傻瓜式安装教程,有电脑就能操作~

作者:程序员晚枫
链接:https://zhuanlan.zhihu.com/p/531681488
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

2. 生成动漫头像

直接上代码!

代码

# 导入这个库:python-office,简写为office
import office

# 1行代码,实现 人像转动漫头像
office.image.img2Cartoon(path = 'd://image//程序员晚枫.jpg')

# 参数说明:
# path:存放自己真人照片的位置 + PDF的文件名,例如:d://image//程序员晚枫.pdf

直接运行以上代码,就会得到一张转化后的动漫头像了。

“ 程序可能需要运行20秒左右。

3.全部功能

1行代码实现复杂功能,是不是使用起来很方便?

“ 项目已被收录进【开源中国】、【Python官网】等平台,所有功能,免费给大家使用:GitHub

4. python-office库,近期添加的功能