+ 我要发布
我发布的 我的标签 发现
浏览器扩展
斑点象@Edge

Python 的 multiprocessing 模块使用指南

在本文中,我们将学习如何使用 multiprocessing 模块中的特定 Python 类,即 process 类。我将通过示例快速概述一下。 什么是 Python 的 multiprocessing 模块? 除了从官方文档中提取内容外,还有什么更好的方法来描述这个模块呢?multiprocessing 是一个支持使用类似于 threading 模块的 API 生成进程的包。multiprocessing 包提供本地和远程并发,通过使用子进程而不是线程来有效地避开全局解释器锁。 本文的重点不是 threading 模块,但总之,threading 模块将处理一小段代码执行(轻量级且共享内存),而 multiprocessing 将处理整个程序执行(更重,且完全隔离)。 如果你想了解更多关于进程和线程之间的区别,请阅读 Jong Hyuck Won 的这篇精彩文章,《进程与线程:有什么区别?》 通常,multiprocessing 模块提供了许多其他类、函数和实用程序,可以在程序执行期间使用它们来处理多个进程执行。该模块专为在工作流程中需要应用并行性的程序而设计,是与之交互的主要点。我们不会介绍 multiprocessing 模块中的所有类和实用程序,而是将重点介绍一个非常特定的类,即 process 类。 什么是 process 类? 在本节中,我们将尝试更好地说明进程是什么,以及如何在 Python 中识别、使用和管理进程。正如 GNU C 库中所解释的:“进程是系统资源分配的基本单位。每个进程都有自己的地址空间和(通常)一个控制线程。进程执行一个程序;你可以让多个进程执行同一个程序,但每个进程在自己的地址空间中拥有该程序的自己的副本,并独立于其他副本执行它。” 但是,在 Python 中这是什么样子呢?到目前为止,我们已经设法对进程是什么进行了一些描述和引用,以及进程和线程之间的区别,但我们还没有涉及任何代码。好吧,让我们改变这种状况,在 Python 中执行一个简单的进程示例: ``` #!/usr/bin/env python import os #A very, very simple process. if __name__ == "__main__": print(f"Hi! I'm process {os.getpid()}") ``` 输出如下: $ python /tmp/tmp.iuW2VAurGG/scratch.py Hi! I'm process 144112 任何正在运行的 Python 程序都是其自身的进程。 在父进程中创建子进程 在父进程中生成不同的子进程呢?为此,我们可以使用 multiprocessing 模块中的 Process 类,它看起来像这样: ``` #!/usr/bin/env python import os import multiprocessing def child_process(): print(f"Hi! I'm a child process {os.getpid()}") if __name__ == "__main__": print(f"Hi! I'm process {os.getpid()}") #Here we create a new instance of the Process class and assign our #`child_process` function to be executed. process = multiprocessing.Process(target=child_process) #We then start the process process.start() #And finally, we join the process. This will make our script to hang and #wait until the child process is done. process.join() ``` 输出如下: $ python /tmp/tmp.iuW2VAurGG/scratch.py Hi! I'm process 144078 Hi! I'm a child process 144079 关于上一个脚本的一个非常重要的注意事项:如果你不使用process.join()来等待子进程执行并完成,那么该点的任何其他后续代码将实际执行,并且可能会变得有点难以同步你的工作流程。 例如: ``` #!/usr/bin/env python import os import multiprocessing def child_process(): print(f"Hi! I'm a child process {os.getpid()}") if __name__ == "__main__": print(f"Hi! I'm process {os.getpid()}") #Here we create a new instance of the Process class and assign our #`child_process` function to be executed. process = multiprocessing.Process(target=child_process) #We then start the process process.start() #And finally, we join the process. This will make our script to hang and #wait until the child process is done. #process.join() print("AFTER CHILD EXECUTION! RIGHT?!") ``` 输出如下: $ python /tmp/tmp.iuW2VAurGG/scratch.py Hi! I'm process 145489 AFTER CHILD EXECUTION! RIGHT?! Hi! I'm a child process 145490 如果你想生成多个进程,则可以利用 for 循环(或任何其他类型的循环)。它们将让你创建对你需要的进程的尽可能多的引用,并在以后的某个阶段启动/加入它们。 ``` #!/usr/bin/env python import os import multiprocessing def child_process(id): print(f"Hi! I'm a child process {os.getpid()} with id#{id}") if __name__ == "__main__": print(f"Hi! I'm process {os.getpid()}") list_of_processes = [] #Loop through the number 0 to 10 and create processes for each one of #them. for i in range(0, 10): #Here we create a new instance of the Process class and assign our #`child_process` function to be executed. Note the difference now that #we are using the `args` parameter now, this means that we can pass #down parameters to the function being executed as a child process. process = multiprocessing.Process(target=child_process, args=(i,)) list_of_processes.append(process) for process in list_of_processes: #We then start the process process.start() #And finally, we join the process. This will make our script to hang #and wait until the child process is done. process.join() ``` 输出如下: $ python /tmp/tmp.iuW2VAurGG/scratch.py Hi! I'm process 146056 Hi! I'm a child process 146057 with id#0 Hi! I'm a child process 146058 with id#1 Hi! I'm a child process 146059 with id#2 Hi! I'm a child process 146060 with id#3 Hi! I'm a child process 146061 with id#4 Hi! I'm a child process 146062 with id#5 Hi! I'm a child process 146063 with id#6 Hi! I'm a child process 146064 with id#7 Hi! I'm a child process 146065 with id#8 Hi! I'm a child process 146066 with id#9 子进程和父进程之间的数据通信 在上一节中,我描述了在 multiprocessing.Process 类构造函数中添加一个新参数 args。这个参数允许你将值传递给子进程,以便在函数内部使用。但是,你知道如何从子进程返回数据吗? 你可能会认为,要从子进程返回数据,必须在其中使用 return 语句才能实际检索数据。进程很棒,可以以隔离的方式执行函数,不会干扰共享资源,这意味着我们所知道的从函数返回数据的正常和通常方式在这里是不允许的,因为它是隔离的。 相反,我们可以使用队列类,它将为我们提供一个接口,以便在父进程和其子进程之间进行数据通信。在这种情况下,队列是一个普通的fifo(先进先出),具有与 multiprocessing 配合使用的内置机制。 例如: ``` #!/usr/bin/env python import os import multiprocessing def child_process(queue, number1, number2): print(f"Hi! I'm a child process {os.getpid()}. I do calculations.") sum = number1 + number2 #Putting data into the queue queue.put(sum) if __name__ == "__main__": print(f"Hi! I'm process {os.getpid()}") #Defining a new Queue() queue = multiprocessing.Queue() #Here we create a new instance of the Process class and assign our #`child_process` function to be executed. Note the difference now that #we are using the `args` parameter now, this means that we can pass #down parameters to the function being executed as a child process. process = multiprocessing.Process(target=child_process, args=(queue,1, 2)) #We then start the process process.start() #And finally, we join the process. This will make our script to hang and #wait until the child process is done. process.join() #Accessing the result from the queue. print(f"Got the result from child process as {queue.get()}") ``` 输出如下: $ python /tmp/tmp.iuW2VAurGG/scratch.py Hi! I'm process 149002 Hi! I'm a child process 149003. I do calculations. Got the result from child process as 3 进程类的异常处理 处理异常是一项特殊且有些困难的任务,我们在使用进程模块时必须不时地经历。原因是,默认情况下,子进程中发生的任何异常都会由生成它的 Process 类处理。 以下代码会引发一个带文本的异常: ``` #!/usr/bin/env python import os import multiprocessing def child_process(): print(f"Hi! I'm a child process {os.getpid()}.") raise Exception("Oh no! :(") if __name__ == "__main__": print(f"Hi! I'm process {os.getpid()}") #Here we create a new instance of the Process class and assign our #`child_process` function to be executed. Note the difference now that #we are using the `args` parameter now, this means that we can pass #down parameters to the function being executed as a child process. process = multiprocessing.Process(target=child_process) try: #We then start the process process.start() #And finally, we join the process. This will make our script to hang and #wait until the child process is done. process.join() print("AFTER CHILD EXECUTION! RIGHT?!") except Exception: print("Uhhh... It failed?") ``` 输出如下: $ python /tmp/tmp.iuW2VAurGG/scratch.py Hi! I'm process 149505 Hi! I'm a child process 149506. Process Process-1: Traceback (most recent call last): File "/usr/lib64/python3.11/multiprocessing/process.py", line 314, in _bootstrap self.run() File "/usr/lib64/python3.11/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/tmp/tmp.iuW2VAurGG/scratch.py", line 7, in child_process raise Exception("Oh no! :(") Exception: Oh no! :( AFTER CHILD EXECUTION! RIGHT?! 如果你跟进代码,你会注意到在 process.join() 调用之后精心放置了一个 print 语句,以模拟父进程在其子进程中引发未处理的异常后仍在运行。 克服这种情况的一种方法是在子进程中实际处理异常,如下所示: ``` #!/usr/bin/env python import os import multiprocessing def child_process(): try: print(f"Hi! I'm a child process {os.getpid()}.") raise Exception("Oh no! :(") except Exception: print("Uh, I think it's fine now...") if __name__ == "__main__": print(f"Hi! I'm process {os.getpid()}") #Here we create a new instance of the Process class and assign our #`child_process` function to be executed. Note the difference now that #we are using the `args` parameter now, this means that we can pass #down parameters to the function being executed as a child process. process = multiprocessing.Process(target=child_process) #We then start the process process.start() #And finally, we join the process. This will make our script to hang and #wait until the child process is done. process.join() print("AFTER CHILD EXECUTION! RIGHT?!") ``` 现在,异常将在子进程中得到处理,你可以控制子进程会发生什么,以及在这种情况下应该做什么。 最后的思考 在以并行方式执行依赖项的解决方案时,特别是在与 Process 类一起使用时,多处理模块非常强大。这为在其自己的隔离进程中执行任何函数增加了可能性。
我的笔记