1. <em id="yud1w"><acronym id="yud1w"><u id="yud1w"></u></acronym></em>
      
      
      <button id="yud1w"></button>

      python教程

      當前位置:首頁?>?python并發編程?>?當前文章

      python并發編程

      進程池Pool及管道Pipe生產者消費者模型

      2020-08-08 60贊 老董筆記

        上一篇通過Queue隊列實現了生產者消費者模型,利用進程池Pool實現生產者消費者模型思路是一樣的。

        如果用管道Pipe()去實現也是一個道理,只不過如果是多個進程同時讀寫管道Pipe()的一端可能會發生數據混亂!官方文檔有明確提示:

      The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).
      The two connection objects returned by Pipe() represent the two ends of the pipe. 
      Each connection object has send() and recv() methods (among others). 
      Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. 
      Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

        所以多個生產者和消費者時需要加鎖,建議大家用隊列去實現,隊列其實就是管道加鎖(保護數據的機制)的機制!

        進程池-多個生產者和消費者模型

      # ‐*‐ coding: utf‐8 ‐*‐
      
      from multiprocessing import Manager
      from multiprocessing import Pool
      import random
      import time
      import os
      
      
      def produce(queue):
          for i in range(5):
              print("produce子進程%s生產 girl_%s" % (os.getpid(),str(i)))
              queue.put('girl_'+ str(i))
              time.sleep(random.randint(0,1))
      
      
      def consumer(queue):
          while True:
              if not queue.empty():
                  girl = queue.get()
                  if girl != 'man':
                      print("consumer子進程%s消費 %s" % (os.getpid(),girl))
                      time.sleep(random.randint(0, 1))
                  else:
                      print("consumer子進程%s獲取結束信號man" % os.getpid())
                      queue.put('man')  # 放到隊列,讓其他消費者也能獲取man
                      break
      
      
      if __name__ == "__main__":
      
          girls_q = Manager().Queue()
      
          p_pool = Pool(3)  # 生產者進程池
          c_pool = Pool(4)  # 消費者進程池
      
          for i in range(3):
              p_pool.apply_async(func=produce,args=(girls_q,))
          for i in range(4):
              c_pool.apply_async(func=consumer,args=(girls_q,))
          p_pool.close()
          c_pool.close()
      
          p_pool.join()
          girls_q.put('man')  # 生產結束信號
          c_pool.join()
      
          print("生產者消費者模型完畢~~~")
      
      
      D:installpython3python.exe D:/pyscript/test/test1.py
      produce子進程9712生產 girl_0
      produce子進程9712生產 girl_1
      consumer子進程11652消費 girl_0
      produce子進程7140生產 girl_0
      produce子進程12788生產 girl_0
      produce子進程12788生產 girl_1
      consumer子進程11704消費 girl_1
      consumer子進程4724消費 girl_0
      consumer子進程10880消費 girl_0
      produce子進程9712生產 girl_2
      produce子進程9712生產 girl_3
      produce子進程9712生產 girl_4
      consumer子進程11652消費 girl_1
      produce子進程7140生產 girl_1
      produce子進程7140生產 girl_2
      produce子進程7140生產 girl_3
      produce子進程7140生產 girl_4
      produce子進程12788生產 girl_2
      consumer子進程11704消費 girl_2
      consumer子進程11704消費 girl_3
      consumer子進程4724消費 girl_4
      consumer子進程4724消費 girl_1
      consumer子進程10880消費 girl_2
      consumer子進程11652消費 girl_3
      produce子進程12788生產 girl_3
      consumer子進程11704消費 girl_4
      consumer子進程11704消費 girl_2
      consumer子進程4724消費 girl_3
      produce子進程12788生產 girl_4
      consumer子進程10880消費 girl_4
      consumer子進程11652獲取結束信號man
      consumer子進程11704獲取結束信號man
      consumer子進程10880獲取結束信號man
      consumer子進程4724獲取結束信號man
      生產者消費者模型完畢~~~
      
      Process finished with exit code 0
      
      


        管道-單個生產者和消費者模型

        管道是由操作系統進行引用計數的,必須在所有進程中關閉管道后才能生成EOFError異常。

      # ‐*‐ coding: utf‐8 ‐*‐
      """
      https://docs.python.org/3/library/multiprocessing.html
      """
      import multiprocessing
      import random
      import time
      import os
      
      def producer(conn1):
      
          for i in range(5):
              item = random.randint(1, 10)
              conn1.send(item)
              print('producer進程({0}) 生產:{1}'.format(os.getpid(), item))
              time.sleep(0.2)
          conn1.close()
      
      
      def consumer(conn2):
      
          while True:
              try:
                  item = conn2.recv()
              except EOFError:
                  conn2.close()
                  break
              else:
                  print('cusumer進程({0})消費{1}'.format(os.getpid(),item))
                  time.sleep(0.2)
      
      
      
      if __name__ == "__main__":
      
          conn1,conn2 = multiprocessing.Pipe()
      
          process_producer = multiprocessing.Process(
              target=producer, args=(conn1,))
      
          process_consumer = multiprocessing.Process(
              target=consumer, args=(conn2,))
      
          process_producer.start()
          process_consumer.start()
      
          process_producer.join()
          conn1.close()
          process_consumer.join()
          conn2.close()
      
          print("一切結束!每個進程中都要關閉conn")
      
      
      D:installpython3python.exe D:/pyscript/test/test1.py
      producer進程(8652) 生產:1
      cusumer進程(10084)消費1
      producer進程(8652) 生產:5
      cusumer進程(10084)消費5
      producer進程(8652) 生產:10
      cusumer進程(10084)消費10
      producer進程(8652) 生產:6
      cusumer進程(10084)消費6
      producer進程(8652) 生產:4
      cusumer進程(10084)消費4
      一切結束,每個進程中都要關閉conn
      
      Process finished with exit code 0
      
      

      感興趣直接點擊圖片獲取>>

      文章評論

      進程池Pool及管道Pipe生產者消費者模型文章寫得不錯,值得贊賞
      国产99视频精品免视看6