抓取速度提升 3 倍!Python 的这个内置库你用上了吗?

从网站中抓取数据是开发者的一个典型“用例”。无论它是属于副业项目,还是你正在成立一个初创公司,抓取数据似乎都很有必要。

举个例子,倘若您想要创建一个比价网站,那么您会需要从各种电商网站上抓取价格信息;或者您想要构建一个可以识别商品并在亚马逊上自动查找价格的“人工智能”。类似的场景还有很多。

但是您有没有注意到,获取所有页面信息的速度有多慢呢?您会选择一个接一个地去抓取商品吗?应该会有更好的解决方案吧?答案是肯定的。

抓取网页可能非常耗时,因为您必须花时间等待服务器响应,抑或是速率受限。这就是为什么我们要向您展示如何通过在 Python 中使用并发来加速您的网页数据抓取项目

前提

为了使代码正常运行,您需要安装 python 3[1]。部分系统可能已经预装了它。然后您还需要使用 pip install 安装所有必要的库。

pip install requests beautifulsoup4 aiohttp numpy

如果您了解并发背后的基础知识,可以跳过理论部分直接进入实际操作环节。

并发

并发是一个术语,用于描述同时运行多个计算任务的能力。

当您按顺序向网站发出请求时,您可以选择一次发出一个请求并等待结果返回,然后再发出下一个请求。

不过,您也可以同时发送多个请求,并在它们返回时处理对应的结果,这种方式的速度提升效果是非常显著的。与顺序请求相比,并发请求无论是否并行运行(多个 CPU),都会比前者快得多 — 稍后会详细介绍。

要理解并发的优势。我们需要了解顺序处理和并发处理任务之间的区别。假设我们有五个任务,每个任务需要 10 秒才能完成。当按顺序处理它们时,完成五个任务所需的时间为 50 秒;而并发处理时,仅需要 10 秒即可完成。

抓取速度提升 3 倍!Python 的这个内置库你用上了吗?

除了提高处理速度之外,并发还允许我们通过将网页抓取任务负载分布于多个进程中,来实现在更短的时间内完成更多的工作。

这里有几种实现并行化请求的方式:例如 multiprocessingasyncio。从网页抓取的角度来看,我们可以使用这些库来并行处理对不同网站或同一网站不同页面的请求。在本文中,我们将重点关注 asyncio,这是一个 Python 内置的模块,它提供了使用协程编写单线程并发代码的基础设施。

由于并发意味着更复杂的系统和代码,因此在使用前请考虑在您的使用场景中是否利大于弊。

并发的优势

  • 在更短的时间内完成更多的工作
  • 可以将空闲的网络时间投入到其他请求中

并发的危险之处

  • 更不易于开发和调试
  • 可能存在竞争条件
  • 需要检查并使用线程安全的函数
  • 一不小心就会增加程序阻塞的概率
  • 并发自带系统开销,因此需要设置合理的并发级别
  • 针对小型站点请求过多的话,可能会变成 DDoS 攻击

抓取速度提升 3 倍!Python 的这个内置库你用上了吗?

 

*同时释放所有请求时要小心*

 

为何选择 asyncio

在做出选择之前,我们有必要了解一下 asynciomultiprocessing 之间的区别,以及 IO 密集型与 CPU 密集型之间的区别。

asyncio[2] “是一个使用 async/await 语法编写并发代码的库”,它在单个处理器上运行。

multiprocessing[3] “是一个支持使用 API 生产进程的包 […] 允许程序员充分利用给定机器上的多个处理器”。每个进程将在不同的 CPU 中启动自己的 Python 解释器。

IO 密集型意味着程序将受 I/O 影响而变得运行缓慢。在我们的案例中,主要指的是网络请求。

CPU 密集型意味着程序会由于 CPU 计算压力导致运行缓慢 — 例如数学计算。

为什么这会影响我们选择用于并发的库?因为并发成本的很大一部分是创建和维护线程/进程。对于 CPU 密集型问题,在不同的 CPU 中拥有多个进程将会提升效率。但对于 I/O 密集型的场景,情况可能并非如此。

由于网页数据抓取主要受 I/O 限制,因此我们选择了 asyncio。但如果有疑问(或只是为了好玩),您可以使用 multiprocessing 尝试这个场景并比较一下结果。

抓取速度提升 3 倍!Python 的这个内置库你用上了吗?

顺序实现的版本

我们将从抓取 scrapeme.live 作为示例开始,这是一个专门用于测试的电子商务网站。

首先,我们将从顺序抓取的版本开始。以下几个片段是所有案例的一部分,因此它们将保持不变。

通过访问目标主页,我们发现它有 48 个子页面。由于是测试环境,这些子页面不会很快发生变化,我们会使用到以下两个常量:

base_url = "https://scrapeme.live/shop/page" 
pages = range(149# max page (48) + 1

现在,从目标产品中提取基础数据。为此,我们使用 requests.get 获取 HTML 内容,然后使用 BeautifulSoup 解析它。我们将遍历每个产品并从中获取一些基本信息。所有选择器都来自对内容的手动审查(使用 DevTools),但为简洁起见,我们不会在这里详细介绍。

import requests 
from bs4 import BeautifulSoup 
 
def extract_details(page): 
 # concatenate page number to base URL 
 response = requests.get(f"{base_url}/{page}/"
 soup = BeautifulSoup(response.text, "html.parser"
 
 pokemon_list = [] 
 for pokemon in soup.select(".product"): # loop each product 
  pokemon_list.append({ 
   "id": pokemon.find(class_="add_to_cart_button").get("data-product_id"), 
   "name": pokemon.find("h2").text.strip(), 
   "price": pokemon.find(class_="price").text.strip(), 
   "url": pokemon.find(class_="woocommerce-loop-product__link").get("href"), 
  }) 
 return pokemon_list

extract_details 函数将获取一个页码并将其连接起来,用于创建子页面的 URL。获取内容并创建产品数组后返回。这意味着返回的值将是一个字典列表,这是一个后续使用的必要细节。

我们需要为每个页面运行上面的函数,获取所有结果,并存储它们。

import csv 
 
# modified to avoid running all the pages unintentionally 
pages = range(13
 
def store_results(list_of_lists): 
 pokemon_list = sum(list_of_lists, []) # flatten lists 
 
 with open("pokemon.csv""w"as pokemon_file: 
  # get dictionary keys for the CSV header 
  fieldnames = pokemon_list[0].keys() 
  file_writer = csv.DictWriter(pokemon_file, fieldnames=fieldnames) 
  file_writer.writeheader() 
  file_writer.writerows(pokemon_list) 
 
list_of_lists = [ 
 extract_details(page) 
 for page in pages 

store_results(list_of_lists)

运行上面的代码将获得两个产品页面,提取产品(总共 32 个),并将它们存储在一个名为 pokemon.csv 的 CSV 文件中。 store_results 函数不影响顺序或并行模式下的抓取。你可以跳过它。

由于结果是列表,我们必须将它们展平以允许 writerows 完成其工作。这就是为什么我们将变量命名为list_of_lists(即使它有点奇怪),只是为了提醒大家它不是扁平的。

输出 CSV 文件的示例:

id name price url
759 Bulbasaur £63.00 https://scrapeme.live/shop/Bulbasaur/
729 Ivysaur £87.00 https://scrapeme.live/shop/Ivysaur/
730 Venusaur £105.00 https://scrapeme.live/shop/Venusaur/
731 Charmander £48.00 https://scrapeme.live/shop/Charmander/
732 Charmeleon £165.00 https://scrapeme.live/shop/Charmeleon/

如果您要为每个页面 (48) 运行脚本,它将生成一个包含 755 个产品的 CSV 文件,并花费大约 30 秒。

time python script.py 
 
real 0m31,806s 
user 0m1,936s 
sys 0m0,073s

asyncio 介绍

我们知道我们可以做得更好。如果我们同时执行所有请求,它应该花费更少时间,对吧?也许会和执行最慢的请求所花费的时间相等。

并发确实应该运行得更快,但它也涉及一些开销。所以这不是线性的数学改进。

为此,我们将使用上面提到的 asyncio。它允许我们在事件循环中的同一个线程上运行多个任务(就像 Javascript 一样)。它将运行一个函数,并在运行时允许时将上下文切换到不同的上下文。在我们的例子中,HTTP 请求允许这种切换。

我们将开始看到一个 sleep 一秒钟的示例。并且脚本应该需要一秒钟才能运行。请注意,我们不能直接调用 main。我们需要让 asyncio 知道它是一个需要执行的异步函数。

import asyncio 
 
async def main(): 
 print("Hello ..."
 await asyncio.sleep(1
 print("... World!"
 
asyncio.run(main())
time python script.py 
Hello ... 
... World! 
 
real 0m1,054s 
user 0m0,045s 
sys 0m0,008s

简单的并行代码

接下来,我们将扩展一个示例案例来运行一百个函数。它们每个都会 sleep 一秒钟并打印一个文本。如果我们按顺序运行它们大约需要一百秒。使用 asyncio,只需要一秒!

这就是并发背后的力量。如前所述,对于纯 I/O 密集型任务,它将执行得更快 – sleep 不是,但它对示例很重要。

我们需要创建一个辅助函数,它会 sleep 一秒钟并打印一条消息。然后,我们编辑 main 以调用该函数一百次,并将每个调用存储在一个任务列表中。最后也是关键的部分是执行并等待所有任务完成。这就是 asyncio.gather[4] 所做的事情。

import asyncio 
 
async def demo_function(i): 
 await asyncio.sleep(1
 print(f"Hello {i}"
 
async def main(): 
 tasks = [ 
  demo_function(i) 
  for i in range(0100
 ] 
 await asyncio.gather(*tasks) 
 
asyncio.run(main())

正如预期的那样,一百条消息和一秒钟的执行时间。完美!

使用 asyncio 进行抓取

我们需要将这些知识应用于数据抓取。遵循的方法是同时请求并返回产品列表,并在所有请求完成后存储它们。每次请求后或者分批保存数据可能会更好,以避免实际情况下的数据丢失。

我们的第一次尝试不会有并发限制,所以使用时要小心。在使用数千个 URL 运行它的情况下……好吧,它几乎会同时执行所有这些请求。这可能会给服务器带来巨大的负载,并可能会损害您的计算机。

requests 不支持开箱即用的异步,因此我们将使用 aiohttp [5] 来避免复杂化。 requests 可以完成这项工作,并且没有实质性的性能差异。但是使用 aiohttp 代码更具可读性。

import asyncio 
import aiohttp 
from bs4 import BeautifulSoup 
 
async def extract_details(page, session): 
 # similar to requests.get but with a different syntax 
 async with session.get(f"{base_url}/{page}/"as response: 
 
  # notice that we must await the .text() function 
  soup = BeautifulSoup(await response.text(), "html.parser"
 
  # [...] same as before 
  return pokemon_list 
 
async def main(): 
 # create an aiohttp session and pass it to each function execution 
 async with aiohttp.ClientSession() as session: 
  tasks = [ 
   extract_details(page, session) 
   for page in pages 
  ] 
  list_of_lists = await asyncio.gather(*tasks) 
  store_results(list_of_lists) 
 
asyncio.run(main())

CSV 文件应该像以前一样包含每个产品的信息 (共 755 个)。由于我们同时执行所有页面调用,结果不会按顺序到达。如果我们将结果添加到 extract_details 内的文件中,它们可能是无序的。但我们会等待所有任务完成然后处理它们,因此顺序性不会有太大影响。

time python script.py 
 
real 0m11,442s 
user 0m1,332s 
sys 0m0,060s

我们做到了!速度提升了 3 倍,但是……不应该是 40 倍吗?没那么简单。许多因素都会影响性能(网络、CPU、RAM 等)。

在这个演示页面中,我们注意到当执行多个调用时,响应时间会变慢,这可能是设计使然。一些服务器/提供商可以限制并发请求的数量,以避免来自同一 IP 的过多流量。它不是一种阻塞,而是一个队列。你会得到服务响应,但需要稍等片刻。

要查看真正的加速,您可以针对延迟[6]页面进行测试。这是另一个测试页面,它将等待 2 秒然后返回响应。

base_url = "https://httpbin.org/delay/2" 
#... 
 
async def extract_details(page, session): 
 async with session.get(base_url) as response: 
  #...

这里去掉了所有的提取和存储逻辑,只调用了延迟 URL 48 次,并在 3 秒内运行完毕。

time python script.py 
 
real 0m2,865s 
user 0m0,245s 
sys 0m0,031s

使用信号量限制并发

如上所述,我们应该限制并发请求的数量,尤其是针对单个域名。

asyncio 带有 Semaphore[7],一个将获取和释放锁的对象。它的内部功能将阻塞一些调用,直到获得锁,从而创建最大的并发性。

我们需要创建尽可能最大值的信号量。然后等待提取函数运行,直到 async with sem 可用。

max_concurrency = 3 
sem = asyncio.Semaphore(max_concurrency) 
 
async def extract_details(page, session): 
 async with sem: # semaphore limits num of simultaneous downloads 
  async with session.get(f"{base_url}/{page}/"as response: 
   # ... 
 
async def main(): 
  # ... 
 
loop = asyncio.get_event_loop() 
loop.run_until_complete(main())

它完成了工作,并且相对容易实现!这是最大并发设置为 3 的输出。

time python script.py 
 
real 0m13,062s 
user 0m1,455s 
sys 0m0,047s

这表明无限并发的版本并没有全速运行。如果我们将限制增加到 10,总时间与未限制的脚本运行时间相近。

使用 TCPConnector 限制并发

aiohttp 提供了一种替代解决方案,可提供进一步的配置。我们可以创建传入自定义 TCPConnector[8] 的客户端会话。

我们可以使用两个适合我们需求的参数来构建它:

  • limit – “同时连接的总数”。
  • limit_per_host – “限制同时连接到同一端点的连接数”(同一主机、端口和 is_ssl)。
max_concurrency = 10 
max_concurrency_per_host = 3 
 
async def main(): 
 connector = aiohttp.TCPConnector(limit=max_concurrency, limit_per_host=max_concurrency_per_host) 
 async with aiohttp.ClientSession(connector=connector) as session: 
  # ... 
 
asyncio.run(main())

这种写法也易于实施和维护!这是每个主机最大并发设置为 3 的输出。

time python script.py 
 
real 0m16,188s 
user 0m1,311s 
sys 0m0,065s

Semaphore 相比的优势是可以选择限制每个域的并发调用和请求的总量。我们可以使用同一个会话来抓取不同的站点,每个站点都有自己的限制。

缺点是它看起来有点慢。需要针对真实案例,使用更多页面和实际数据运行一些测试。

multiprocessing

就像我们之前看到的那样,数据抓取是 I/O 密集型的。但是,如果我们需要将它与一些 CPU 密集型计算混合怎么办?为了测试这种情况,我们将使用一个函数,该函数将在每个抓取的页面之后 count_a_lot。这是强制 CPU 忙碌一段时间的简单(且有些愚蠢)的方法。

def count_a_lot(): 
 count_to = 100_000_000 
 counter = 0 
 while counter < count_to: 
  counter = counter + 1 
 
async def extract_details(page, session): 
 async with session.get(f"{base_url}/{page}/"as response: 
  # ... 
  count_a_lot() 
  return pokemon_list

对于 asyncio 版本,只需像以前一样运行它。可能需要很长时间⏳。

time python script.py 
 
real 2m37,827s 
user 2m35,586s 
sys 0m0,244s

现在,比较难理解的部分来了:

直接引入 multiprocessing 看起来有点困难。实际上,我们需要创建一个 ProcessPoolExecutor,它能够“使用一个进程池来异步执行调用”。它将处理不同 CPU 中每个进程的创建和控制

但它不会分配负载。为此,我们将使用 NumPyarray_split,它会根据 CPU 的数量将页面范围分割成相等的块。

main 函数的其余部分类似于 asyncio 版本,但更改了一些语法以匹配 multiprocessing 的语法风格。

此处的本质区别是我们不会直接调用extract_details。实际上是可以的,但我们将尝试通过将 multiprocessingasyncio 混合使用来获得最好的执行效率。

from concurrent.futures import ProcessPoolExecutor 
from multiprocessing import cpu_count 
import numpy as np 
 
num_cores = cpu_count() # number of CPU cores 
 
def main(): 
 executor = ProcessPoolExecutor(max_workers=num_cores) 
 tasks = [ 
  executor.submit(asyncio_wrapper, pages_for_task) 
  for pages_for_task in np.array_split(pages, num_cores) 
 ] 
 doneTasks, _ = concurrent.futures.wait(tasks) 
 
 results = [ 
  item.result() 
  for item in doneTasks 
 ] 
 store_results(results) 
 
main()

长话短说,每个 CPU 进程都会有几页需要抓取。一共有 48 个页面,假设你的机器有 8 个 CPU,每个进程将请求 6 个页面(6 * 8 = 48)。

这六个页面将同时运行!之后,计算将不得不等待,因为它们是 CPU 密集型的。但是我们有很多 CPU,所以它们应该比纯 asyncio 版本运行得更快。

async def extract_details_task(pages_for_task): 
 async with aiohttp.ClientSession() as session: 
  tasks = [ 
   extract_details(page, session) 
   for page in pages_for_task 
  ] 
  list_of_lists = await asyncio.gather(*tasks) 
  return sum(list_of_lists, []) 
 
 
def asyncio_wrapper(pages_for_task): 
 return asyncio.run(extract_details_task(pages_for_task))

这就是神奇的地方。每个 CPU 进程将使用页面的子集启动一个 asyncio(例如,第一个页面从 1 到 6)。

然后,每一个都将调用几个 URL,使用已知的 extract_details 函数。

上述内容需要花点时间来吸收它。整个过程是这样的:

  1. 创建执行器
  2. 拆分页面
  3. 每个进程启动 asyncio
  4. 创建一个 aiohttp 会话并创建页面子集的任务
  5. 提取每一页的数据
  6. 合并并存储结果

下面是本次的执行时间。虽然之前我们没有提到它,但这里的 user 时间却很显眼。对于仅运行 asyncio 的脚本:

time python script.py 
 
real 2m37,827s 
user 2m35,586s 
sys 0m0,244s

具有 asyncio 和多个进程的版本:

time python script.py 
 
real 0m38,048s 
user 3m3,147s 
sys 0m0,532s

发现区别了吗?实际运行时间方面第一个用了两分钟多,第二个用了 40 秒。但是在总 CPU 时间(user 时间)中,第二个超过了三分钟!看起来系统开销的耗时确实有点多。

这表明并行处理“浪费”了更多时间,但程序是提前完成的。显然,您在决定选择哪种方法时,需要考虑到开发和调试的复杂度。

结论

我们已经看到 asyncio 足以用于抓取,因为大部分运行时间都用于网络请求,这种场景属于 I/O 密集型并且适用于单核中的并发处理。

如果收集的数据需要一些 CPU 密集型工作,这种情况就会改变。虽然有关计数的例子有一点愚蠢,但至少你理解了这种场景。

在大多数情况下,带有 aiohttpasyncio 比异步的 requests 更适合完成目标工作。同时我们可以添加自定义连接器以限制每个域名的请求数、并发请求总数。有了这三个部分,您就可以开始构建一个可以扩展的数据抓取程序了。

另一个重要的部分是允许新的 URL/任务加入程序运行(类似于队列),但这是另一篇文章的内容。敬请关注!

参考资料

[1]

 

python 3: https://www.python.org/downloads/

[2]

asyncio: https://docs.python.org/3/library/asyncio.html

[3]

multiprocessing: https://docs.python.org/3/library/multiprocessing.html

[4]

asyncio.gather: https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

[5]

aiohttp: https://docs.aiohttp.org/

[6]

延迟: https://httpbin.org/delay/2

[7]

Semaphore: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Semaphore

[8]

TCPConnector: https://docs.aiohttp.org/en/stable/client_reference.html#tcpconnector

[9]

参考原文: https://www.zenrows.com/blog/speed-up-web-scraping-with-concurrency-in-python

 

 

– EOF –

 

转自:https://mp.weixin.qq.com/s/pPpc7CH4Rr9WTa4p8ue2Iw

python-office库

1行代码,生成动漫头像

听说某宝需要50块钱一张?别再去交智商税了!

1. 安装python-office

安装很简单,在有python环境的电脑上,只需要执行下面这一行命令。

“ 如果你之前使用过python-office这个库,也需要执行一下,可以下载到最新版本~

安装

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple python-office -U

如果你的电脑里还没有安装python环境,可以看一下下面这个6分钟的傻瓜式安装教程,有电脑就能操作~

作者:程序员晚枫
链接:https://zhuanlan.zhihu.com/p/531681488
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

2. 生成动漫头像

直接上代码!

代码

# 导入这个库:python-office,简写为office
import office

# 1行代码,实现 人像转动漫头像
office.image.img2Cartoon(path = 'd://image//程序员晚枫.jpg')

# 参数说明:
# path:存放自己真人照片的位置 + PDF的文件名,例如:d://image//程序员晚枫.pdf

直接运行以上代码,就会得到一张转化后的动漫头像了。

“ 程序可能需要运行20秒左右。

3.全部功能

1行代码实现复杂功能,是不是使用起来很方便?

“ 项目已被收录进【开源中国】、【Python官网】等平台,所有功能,免费给大家使用:GitHub

4. python-office库,近期添加的功能

10个有趣的 Python 高级脚本,建议收藏!

来自公众号:法纳斯特

大家好,我是小F。

 

在日常的工作中,我们总会面临到各式各样的问题。

 

其中不少的问题,使用一些简单的Python代码就能解决。

 

比如不久前的复旦大佬,用130行Python代码硬核搞定核酸统计,大大提升了效率,节省了不少时间。

 

今天,小F就带大家学习一下10个Python脚本程序。

 

虽然简单,不过还是蛮有用的。

 

有兴趣的可以自己去实现,找到对自己有帮助的技巧。

 

 

▍1、Jpg转Png

 

图片格式转换,以前小F可能第一时间想到的是【格式工厂】这个软件。

 

如今编写一个Python脚本就能完成各种图片格式的转换,此处以jpg转成png为例。

 

有两种解决方法,都分享给大家。

 

# 图片格式转换, Jpg转Png

# 方法①
from PIL import Image

img = Image.open(‘test.jpg’)
img.save(‘test1.png’)


# 方法②
from cv2 import imread, imwrite

image = imread(“test.jpg”1)
imwrite(“test2.png”, image)

 

 

▍2、PDF加密和解密

 

如果你有100个或更多的PDF文件需要加密,手动进行加密肯定是不可行的,极其浪费时间。

 

使用Python的pikepdf模块,即可对文件进行加密,写一个循环就能进行批量加密文档。

 

# PDF加密
import pikepdf

pdf = pikepdf.open(“test.pdf”)
pdf.save(‘encrypt.pdf’, encryption=pikepdf.Encryption(owner=“your_password”, user=“your_password”, R=4))
pdf.close()

 

有加密那么便会有解密,代码如下。

# PDF解密
import pikepdf

pdf = pikepdf.open(“encrypt.pdf”,  password=‘your_password’)
pdf.save(“decrypt.pdf”)
pdf.close()

 

 

▍3、获取电脑的配置信息

 

很多小伙伴可能会使用鲁大师来看自己的电脑配置,这样还需要下载一个软件。

 

使用Python的WMI模块,便可以轻松查看你的电脑信息。

 

# 获取计算机信息
import wmi


def System_spec():
    Pc = wmi.WMI()
    os_info = Pc.Win32_OperatingSystem()[0]
    processor = Pc.Win32_Processor()[0]
    Gpu = Pc.Win32_VideoController()[0]
    os_name = os_info.Name.encode(‘utf-8’).split(b’|’)[0]
    ram = float(os_info.TotalVisibleMemorySize) / 1048576

    print(f’操作系统: {os_name})
    print(f’CPU: {processor.Name})
    print(f’内存: {ram} GB’)
    print(f’显卡: {Gpu.Name})

    print(“n计算机信息如上 ↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑”)


System_spec()

 

就以小F自己的电脑为例,运行代码就能看到配置。

 

10个有趣的 Python 高级脚本,建议收藏!

 

 

▍4、解压文件

 

使用zipfile模块进行文件解压,同理也可以对文件进行压缩。

 

# 解压文件
from zipfile import ZipFile

unzip = ZipFile(“file.zip”“r”)
unzip.extractall(“output Folder”)

 

 

▍5、Excel工作表合并

 

帮助你将Excel工作表合并到一张表上,表内容如下图。

 

10个有趣的 Python 高级脚本,建议收藏!

 

6张表,其余表的内容和第一张表都一样。

 

设置表格数量为5,将会合并前5张表的内容。

 

import pandas as pd

# 文件名
filename = “test.xlsx”
# 表格数量
T_sheets = 5

df = []
for i in range(1, T_sheets+1):
    sheet_data = pd.read_excel(filename, sheet_name=i, header=None)
    df.append(sheet_data)

# 合并表格
output = “merged.xlsx”
df = pd.concat(df)
df.to_excel(output)

 

结果如下。

 

10个有趣的 Python 高级脚本,建议收藏!

 

 

▍6、将图像转换为素描图

 

和之前的图片格式转换有点类似,就是对图像进行处理。

 

以前大家可能会使用到美图秀秀,现在可能就是抖音的滤镜了。

 

其实使用Python的OpenCV,就能够快速实现很多你想要的效果。

 

# 图像转换
import cv2

# 读取图片
img = cv2.imread(“img.jpg”)
# 灰度
grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
invert = cv2.bitwise_not(grey)
# 高斯滤波
blur_img = cv2.GaussianBlur(invert, (77), 0)
inverse_blur = cv2.bitwise_not(blur_img)
sketch_img = cv2.divide(grey, inverse_blur, scale=256.0)
# 保存
cv2.imwrite(‘sketch.jpg’, sketch_img)
cv2.waitKey(0)
cv2.destroyAllWindows()

 

原图如下。

 

10个有趣的 Python 高级脚本,建议收藏!

 

素描图如下,还挺好看的。

 

10个有趣的 Python 高级脚本,建议收藏!

 

 

▍7、获取CPU温度

 

有了这个Python脚本,你将不需要任何软件来了解CPU的温度。

 

# 获取CPU温度
from time import sleep
from pyspectator.processor import Cpu
cpu = Cpu(monitoring_latency=1)
with cpu:
    while True:
        print(f’Temp: {cpu.temperature} °C’)
        sleep(2)

 

 

▍8、提取PDF表格

 

有的时候,我们需要从PDF中提取表格数据。

 

第一时间你可能会先想到手工整理,但是当工作量特别大,手工可能就比较费劲。

 

然后你可能会想到一些软件和网络工具来提取 PDF 表格。

 

下面这个简单的脚本将帮助你在一秒钟内完成相同的操作。

 

# 方法①
import camelot

tables = camelot.read_pdf(“tables.pdf”)
print(tables)
tables.export(“extracted.csv”, f=“csv”, compress=True)

# 方法②, 需要安装Java8
import tabula

tabula.read_pdf(“tables.pdf”, pages=“all”)
tabula.convert_into(“table.pdf”“output.csv”, output_format=“csv”, pages=“all”)

 

PDF文档的内容如下,包含了一个表格。

 

10个有趣的 Python 高级脚本,建议收藏!

 

提取到的CSV文件内容如下。

 

10个有趣的 Python 高级脚本,建议收藏!

 

 

▍9、截图

 

该脚本将简单地截取屏幕截图,而无需使用任何屏幕截图软件。

 

在下面的代码中,给大家展示了两种Python截取屏幕截图的方法。

 

# 方法①
from mss import mss
with mss() as screenshot:
    screenshot.shot(output=‘scr.png’)

# 方法②
import PIL.ImageGrab
scr = PIL.ImageGrab.grab()
scr.save(“scr.png”)

 

 

▍10、拼写检查器

 

这个Python脚本可以进行拼写检查,当然只对英文有效,毕竟中文博大精深呐。

 

# 拼写检查
# 方法①
import textblob

text = “mussage”
print(“original text: “ + str(text))

checked = textblob.TextBlob(text)
print(“corrected text: “ + str(checked.correct()))

# 方法②
import autocorrect
spell = autocorrect.Speller(lang=‘en’)

# 以英语为例
print(spell(‘cmputr’))
print(spell(‘watr’))
print(spell(‘survice’))

— EOF —

转自:https://mp.weixin.qq.com/s/wuA5_ZV328E5bpSSR0XbiA

FastAPI的小兄弟,开发命令行工具更给力

来源丨经授权转自 未闻Code(ID:itskingname)

作者丨kingname

关注我公众号的同学都知道,我非常喜欢FastAPI这个web框架。它在易用性上面做到了极致,帮助开发者减少了很多不必要的工作。

FastAPI的开发组织叫做tiangolo,他家除了FastAPI外,还有另一个项目也非常好用,叫做typer

三年前,我写过一篇文章一日一技:快速实现Python 命令行参数介绍另一个命令行工具fire. 而typer做得比fire还要好。

首先使用pip来安装它:

python3 -m pip install typer

函数参数等于命令行参数

我们首先来看看typer怎么使用。创建一个example_1.py文件,写入如下代码。

import typer


def main(name: str, salary: int):
    print(f'{name}月薪{salary}元')


if __name__ == '__main__':
    typer.run(main)

直接运行,Python会报错:

FastAPI的小兄弟,开发命令行工具更给力

使用参数--help可以查看这个脚本的命令行参数:

FastAPI的小兄弟,开发命令行工具更给力

于是我们根据这里的提示,输入正确的参数,从而正常运行程序:

FastAPI的小兄弟,开发命令行工具更给力

子命令与自动补全更好用

假设我们有一个神经网络的程序,其中的入口函数代码如下:

def train_data(train_folder: str, test_folder: str, rate: float = 0.8):
    """
    训练人脸检测模型
    """
    print(f'使用文件夹{train_folder}中的数据进行训练')
    print(f'使用{test_folder}中的数据用来验证训练效果,确保准确率>{rate}')
    return True

def predict(folder: str):
    """
    使用训练好的模型预测
    """
    print(f'对文件夹{folder}中的数据进行预测。')

显然,这个程序可以用来训练数据,也可以用来预测数据,所以有两种不同的命令,每一种命令有不同的参数。

这种情况下,使用typer非常方便,只需要加两个装饰器就可以了:

FastAPI的小兄弟,开发命令行工具更给力

运行效果如下图所示:

FastAPI的小兄弟,开发命令行工具更给力

输入具体的子命令,还可以查看每个子命令的参数:

FastAPI的小兄弟,开发命令行工具更给力

因此,我可以使用两个不同的子命令来运行程序:

FastAPI的小兄弟,开发命令行工具更给力

你以为这样就完了?我们再安装它的一个辅助工具typer-cli,还可以做更多事情:

python3 -m pip install typer-cli
typer --install-completion

有了这个东西,我们运行程序可以这样写:

typer example_2.py run 子命令 参数1 参数2 --可选参数1 可选参数1的值

例如:

FastAPI的小兄弟,开发命令行工具更给力

并且,typer可以帮我们可以实现自动补全:

输入typer example_2.py run 然后按下Tab键,自动告诉你可以输入哪些子命令,如下图所示:

FastAPI的小兄弟,开发命令行工具更给力

除此之外,如果你的命令行程序只有一个命令,那么你甚至只需要写一个函数,连typer都不需要导入,就可以使用typer来运行:

FastAPI的小兄弟,开发命令行工具更给力

自动生成文档也简单

我们知道,FastAPI自动生成接口文档的功能非常好用。typer作为它的兄弟,也继承了这个高级功能。我们来看看:

import typer

app = typer.Typer(help="人脸检测模型")


@app.command()
def train_data(train_folder: str, test_folder: str, rate: float = 0.8):
    """
    训练人脸检测模型
    """
    print(f'使用文件夹{train_folder}中的数据进行训练')
    print(f'使用{test_folder}中的数据用来验证训练效果,确保准确率>{rate}')
    return True

@app.command()
def predict(folder: str):
    """
    使用训练好的模型预测
    """
    print(f'对文件夹{folder}中的数据进行预测。')

运行命令:

typer main.py utils docs --name "python3 main.py" --output readme.md

自动在当前文件夹生成一个readme.md文件。我们使用任何能够渲染Markdown的软件打开这个文档,可以看到文档内容如下:

FastAPI的小兄弟,开发命令行工具更给力

这样一来,我们不需要额外花心思去维护文档,只需要在修改完代码、增删新的命令或者参数以后,运行这个命令,就可以把文档自动更新。

转自:https://mp.weixin.qq.com/s/jE5cQtlmXXxJ9-eTpXsW-w

社区发现之标签传播算法(LPA)

在Graph领域,社区发现(Community detection)是一个非常热门且广泛的话题,后面会写一个系列,该问题实际上是从子图分割的问题演变而来,在真实的社交网络中,有些用户之间连接非常紧密,有些用户之间的连接较为稀疏,连接紧密的用户群体可以看做一个社区,在风控问题中,可以简单的理解为团伙挖掘。

目前的社区发现问题分为两大类:非重叠社区发现和重叠社区发现。非重叠社区发现问题描述的是:一个网络中,每个节点均只能属于同一个社区,这意味这社区和社区之间是没有交集的。在非重叠社区发现算法中,有不同种类的解法:
1)基于模块度的社区发现算法:基本思想是通过定义模块度(Modularity)来衡量一个社区的划分是不是相对比较好的结果,从而将社区发现问题转化为最大化模块度的问题进行求解,后续的Louvain算法会讲到。
2)基于标签传播的社区发现算法:基本思想是通过标记节点的标签信息来更新未标记节点的标签信息,在整个网络中进行传播,直至收敛,其中最具代表性的就是标签传播算法(LPA,Label Propagation Algorithm),也是本文要讨论的算法。
注意:在团伙挖掘的实际应用的过程中,不要寄希望于优化社区发现算法提高准确性,可能永远都解决不了问题,因为关系的形成在实际中太过于复杂,我们更多的关注构图关系的筛选、清洗、提纯,以及分群后进一步加工处理
 

一、LPA概述

Label Propagation Algorithm,也称作标签传播算法(LPA),是一个在图中快速发现社群的算法,由Raghavan等人在2007年于论文《Near linear time algorithm to detect community structures in large-scale networks》中提出。在 LPA 算法中,节点的标签完全由它的直接邻居决定。标签传播算法是一种基于标签传播的局部社区发现算法,其基本思想是节点的标签(community)依赖其邻居节点的标签信息,影响程度由节点相似度决定,并通过传播迭代更新达到稳定。

1、算法的思想

在用一个唯一的标签初始化每个节点之后,该算法会重复地将一个节点的标签社群化为该节点的相邻节点中出现频率最高的标签。当每个节点的标签在其相邻节点中出现得最频繁时,算法就会停止。该算法是异步的,因为每个节点都会在不等待其余节点更新的情况下进行更新。
该算法有5个步骤:
1)初始化网络中所有节点的标签,对于给定节点x,Cx(0)=x。
2)设置 t=1。
3)以随机顺序排列网络中的节点,并将其设置为x。
4)对于特定顺序选择的每个x∈X,让Cx(t)=f(Cxi1(t),…,Cxim(t),…。f这里返回相邻标签中出现频率最高的标签。如果有多个最高频率的标签,就随机选择一个标签。
5)如果每个节点都有其邻居节点中数量最多的标签,则停止算法,否则,设置t=t+1并转到3。
这是一个迭代的计算过程且不保证收敛,大体的思路就是每个人都看看自己的邻居都在什么社区内,看看频率最高的社区是啥,如果和自己当前的社区不一样,就把这个最高频社区当成是自己的社区,然后告诉邻居,周而复始,直到对于所有人,邻居们告诉自己的高频社区和自己当前的社区是一样的,算法结束。所以说对于这个算法,计算复杂度是O(kE),k是迭代的次数,E是边的数量。大家的经验是这个迭代的次数大概是5次就能近似收敛,以实现精度和性能的平衡,能发现这个数字和六度分隔理论里面的数字也差不多。
我们可以很形象地理解算法的传播过程,当标签在紧密联系的区域,传播非常快,但到了稀疏连接的区域,传播速度就会下降。当出现一个节点属于多个社群时,算法会使用该节点邻居的标签与权重,决定最终的标签,传播结束后,拥有同样标签的节点被视为在同一群组中。
下图展示了算法的两个变种:Push 和 Pull。其中 Pull 算法更为典型,并且可以很好地并行计算:
社区发现之标签传播算法(LPA)
我们不再继续深入,看完上图,你应该已经理解了算法的大概过程。其实,做过图像处理的人很容易明白,所谓的标签传播算法,不过是图像分割算法的变种,Push 算法是区域生长法(Region Growing)的简化版,而 Pull 更像是分割和合并(divide-and-merge,也有人称 split-merge)算法。确实,图像(image)的像素和图(graph)的节点是十分类似的。

2、用于图聚类

图聚类是根据图的拓扑结构,进行子图的划分,使得子图内部节点的链接较多,子图之间的连接较少。依赖其邻居节点的标签信息,影响程度由节点相似度决定,并通过传播迭代更新达到稳定。
参考原始论文
https://arxiv.org/abs/0709.2938
https://arxiv.org/pdf/0709.2938.pdf
在算法开始之前为每个节点打上不同的标签,每一个轮次随机找到一个节点,查看其邻居节点的标签,找到出现次数最多的标签,随后将该节点改成该标签。当相邻两次迭代后社区数量不变或社区内节点数量不变时则停止迭代,下面看图解过程
初始化
社区发现之标签传播算法(LPA)
第一轮迭代
随机挑选一个节点(如c),发现其相邻节点有abe,三者出现次数相同,故随机选一个(如a),那么c点的标签被a替代。
社区发现之标签传播算法(LPA)
第二轮迭代
随机挑选一个节点(如b),发现其相邻节点均为a,故将b换成a,重复数次,最终的结果如图所示
社区发现之标签传播算法(LPA)
我们再看一个例子,比如下图:
社区发现之标签传播算法(LPA)
分组后的结果如下,我们得到了独立非重的groupid,这个结果其实是很难在实际场景中应用的,那么我们就的结果就没有意义了么?这个可以帮我们定位到浓度很高的群体,然后再加上部分属性标签,就能轻而易举的识别出问题黑产了。
社区发现之标签传播算法(LPA)

3、用于半监督

该算法也可以作为半监督的分类算法,标签传播时一种半监督机器学习算法,它将标签分配给以前未标记的数据点。在算法开始时,数据点的子集(通常只占很小一部分)有标签(或分类)。在整个算法过程中,这些标签会传播到未标记的点。在标签传播过程中,保持已标注数据的标签不变,使其像一个源头把标签传向未标注数据。
最终,当迭代过程结束时,相似节点的概率分布也趋于相似,可以划分到同一个类别中,从而完成标签传播过程,边的权重越大,表示两个节点越相似,那么label越容易传播过去。我们定义一个NxN的概率转移矩阵P:
社区发现之标签传播算法(LPA)
下面的图来看看传播过程
社区发现之标签传播算法(LPA)
传播结束后的结果如下:
社区发现之标签传播算法(LPA)
LPA使用已标记节点的标签作为基础,并尝试预测未标记节点的标签。然而,如果最初的标签是错误的,这可能会影响标签的传播过程,错误的标签可能会被传播。该算法是概率性的,并且发现的社区可能因执行的不同而不同。
 

二、算法代码实现

这个算法比较简单,有比较多的实现方式,最方便的还是networkx这个库,并用里面的一个简单的数据集进行试验。

1、数据集介绍

空手道数据集是一个比较简单的图数据集,下面我们看看其中的边和节点,后面应用这个数据集进行试验。
import networkx as nxG = nx.karate_club_graph() # 空手道G.nodes()NodeView((0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33))
G.edges() EdgeView([(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8),(0, 10), (0, 11), (0, 12), (0, 13), (0, 17), (0, 19), (0, 21), (0, 31), (1, 2), (1, 3), (1, 7), (1, 13), (1, 17), (1, 19), (1, 21), (1, 30), (2, 3), (2, 7), (2, 8), (2, 9), (2, 13), (2, 27), (2, 28), (2, 32), (3, 7), (3, 12), (3, 13), (4, 6), (4, 10), (5, 6), (5, 10), (5, 16), (6, 16), (8, 30), (8, 32), (8, 33), (9, 33), (13, 33), (14, 32), (14, 33), (15, 32), (15, 33), (18, 32), (18, 33), (19, 33), (20, 32), (20, 33), (22, 32), (22, 33), (23, 25), (23, 27), (23, 29), (23, 32), (23, 33), (24, 25), (24, 27), (24, 31), (25, 31), (26, 29), (26, 33),(27, 33), (28, 31), (28, 33), (29, 32), (29, 33), (30, 32), (30, 33),(31, 32), (31, 33), (32, 33)])

2、自己实现LPA算法

import randomimport networkx as nximport matplotlib.pyplot as plt # 应该封装成类的形式 def lpa(G): ''' 异步更新方式 G:图 return:None 通过改变节点的标签,最后通过标签来划分社区 算法终止条件:迭代次数超过设定值 ''' max_iter_num = 0 # 迭代次数 while max_iter_num < 10: max_iter_num += 1 print('迭代次数',max_iter_num) for node in G: count = {} # 记录邻居节点及其标签 for nbr in G.neighbors(node): # node的邻居节点 label = G.nodes[nbr]['labels'] count[label] = count.setdefault(label,0) + 1 #找到出现次数最多的标签 count_items = sorted(count.items(),key=lambda x:-x[-1]) best_labels = [k for k,v in count_items if v == count_items[0][1]] #当多个标签最大技术值相同时随机选取一个标签 label = random.sample(best_labels,1)[0] # 返回的是列表,所以需要[0] G.nodes[node]['labels'] = label # 更新标签 def draw_picture(G): # 画图 node_color = [float(G.nodes[v]['labels']) for v in G] pos = nx.spring_layout(G) # 节点的布局为spring型 plt.figure(figsize = (8,6)) # 图片大小 nx.draw_networkx(G,pos=pos,node_color=node_color) plt.show() if __name__ == "__main__": G = nx.karate_club_graph() #空手道数据集 # 给节点添加标签 for node in G: G.add_node(node, labels = node) #用labels的状态 lpa(G) com = set([G.nodes[node]['labels'] for node inG]) print('社区数量',len(com)) draw_picture(G)
迭代次数 1迭代次数 2迭代次数 3迭代次数 4迭代次数 5迭代次数 6迭代次数 7迭代次数 8迭代次数 9迭代次数 10社区数量 3
代码运行结果:
社区发现之标签传播算法(LPA)

3、调包实现LPA算法

networkx集成了这个算法,可以直接调用
import matplotlib.pyplot as pltimport networkx as nxfrom networkx.algorithms.community import asyn_lpa_communities as lpa
# 空手道俱乐部G = nx.karate_club_graph()com = list(lpa(G))print('社区数量',len(com))
com [{0, 1, 2, 3, 7, 8, 9, 11, 12, 13, 17, 19, 21, 30},{4, 5, 6, 10, 16},{14, 15, 18, 20, 22, 23, 24, 25, 26, 27, 28, 29, 31, 32, 33}]

# 下面是画图pos = nx.spring_layout(G) # 节点的布局为spring型NodeId = list(G.nodes())node_size = [G.degree(i)**1.2*90 for i in NodeId] # 节点大小
plt.figure(figsize = (8,6)) # 设置图片大小nx.draw(G,pos, with_labels=True, node_size =node_size, node_color='w', node_shape = '.' )
'''node_size表示节点大小node_color表示节点颜色with_labels=True表示节点是否带标签'''color_list = ['pink','orange','r','g','b','y','m','gray','black','c','brown']
for i in range(len(com)): nx.draw_networkx_nodes(G, pos, nodelist=com[i], node_color = color_list[i+2], label=True)plt.show()
社区发现之标签传播算法(LPA)

三、分群结果可视化

在可视化方面,确实R语言要强,大家有时间可以学习下,活儿全还是有点用处的,我们这里用R的igraph包来展现一些社区发现的结果。
library('igraph')karate <- graph.famous("Zachary")community <- label.propagation.community(karate)# 计算模块度modularity(community)0.3717949
#membership查看每个点的各自分组情况。membership(community)1 1 1 1 1 1 1 1 2 1 1 1 1 1 2 2 1 1 2 1 2 1 2 2 2 2 2 2 2 2 2 2 2 2
plot(community,karate)
下面为两次跑的结果,可以看到,两次的结果并不一样,这个就是震荡效应导致的结果
社区发现之标签传播算法(LPA)
换一个对比下看看
community <- walktrap.community(karate, weights = E(karate)$weight, steps = 8, merges =TRUE, modularity = TRUE)plot(community,karate)
社区发现之标签传播算法(LPA)
可以用更复杂的数据,画出来还挺好看的,数据集的下载地址:http://snap.stanford.edu/data/egonets-Facebook.html
library(igraph)library(d3Network)igraphDat <- read.graph(file = "/Users/wuzhengxiang/Documents/PPT模板/0.edges", directed = FALSE)
## Simplify to remove duplications and from-self-to-self loopsigraphDat <- simplify(igraphDat, remove.multiple = TRUE, remove.loops = TRUE )
## Give numbersV(igraphDat)$label <- seq_along(V(igraphDat))
## Average path length between any two given nodes(averagePathLength <- average.path.length(igraphDat))
## Community structure detection based on edge betweennesscommunityEdgeBetwn <- edge.betweenness.community(igraphDat)
## Check the transitivity of a graph (probability that the adjacent vertices of a vertex are connected)(transitivityDat <- transitivity(igraphDat, type = "localaverage", isolates = "zero") )
## Set the seed to get the same resultset.seed("20200513")
## Add community indicating background colorsplot(igraphDat,vertex.color = communityEdgeBetwn$membership, vertex.size = log(degree(igraphDat) + 1),mark.groups = by(seq_along(communityEdgeBetwn$membership), communityEdgeBetwn$membership, invisible) )
## Annotatetitle("Stanford Facebook data", sub = "http://snap.stanford.edu/data/egonets-Facebook.html" )text(x = -1, y = -1, labels = sprintf("Average path length: %.2fnTransitivity: %.2f", averagePathLength, transitivityDat) )
社区发现之标签传播算法(LPA)
社区发现之标签传播算法(LPA)
社区发现之标签传播算法(LPA)

四、算法优缺点

作为一个比较简单的算法,其优缺点也是特别的明显。

1、算法优点

算法逻辑简单,时间复杂度低,接近线性复杂度,在超大规模网络下会有优异的性能,适合做社区发现的baseline。
无须定义优化函数,无须事先指定社区个数,算法会利用自身的网络结构来指导标签传播。

2、算法缺点

雪崩效应:社区结果不稳定,随机性强。由于当邻居节点的社区标签权重相同时,会随机取一个。导致传播初期一个小的错误被不断放大,最终没有得到合适的结果。尤其是异步更新时,更新顺序的不同也会导致最终社区划分结果不同。
社区发现之标签传播算法(LPA)
上图中展示了一次标签传播算法的流程:初始化阶段,每个节点都以自己作为社区标签。比如a的社区就是a,c的社区就是c。当进入传播阶段,节点c的邻居节点共4个:a,b,c,d。而社区标签也是4个:a,b,c,d,假设随机取了一个a。
如果是异步更新,此时b,d,e三个节点的邻居节点中社区标签均存在2个a,所以他们都会立马更新成a。如果c当时随机选择的是b,那么d,e就会更新成b,从而导致b社区标签占优,而最终的社区划分也就成b了。
震荡效应:社区结果来回震荡,不收敛,当传播方式处于同步更新的时候,尤其对于二分图或子图存在二分图的结构而言,极易发生。
社区发现之标签传播算法(LPA)
上图中展示了一次二分图中标签传播算法的流程,在同步更新的时候,每个节点依赖的都是上一轮迭代的社区标签。当二分图左边都是a,右边都是b时,a社区的节点此时邻居节点都是b,b社区的节点此时邻居节点都是a,根据更新规则,此时a社区的节点将全部更新为b,b社区的节点将全部更新为a。此时算法无法收敛,使得整个网络处于震荡中。
转自:https://mp.weixin.qq.com/s/DgCK9Ea-Qf00KyB1W8x8Iw