博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
利用multiprocessing.managers开发跨进程生产者消费者模型
阅读量:4879 次
发布时间:2019-06-11

本文共 3409 字,大约阅读时间需要 11 分钟。

研究了下multiprocessing.managers,略有收获,随笔一篇;

核心思路是构造一个manager进程,这个进程可以通过unix socket或tcp socket与其它进程通信;因为利用了socket,所以通信的进程间不要求具备父子关系,甚至可以跨主机(通过tcp socket);

通过manager进行数据结构共享,可以应用于很多的IPC场景;这里只做一个示例,在manager内维护一个队列,做为生产者消费者模型中的消息队列;

 

# coding:utf-8import osimport timeimport subprocessfrom multiprocessing import Process#from multiprocessing.managers import SyncManagerfrom multiprocessing.managers import BaseManagerfrom multiprocessing import JoinableQueue#父进程pid = os.getpid()print("parent pid %d" % pid)class TestManager(BaseManager):  pass#用于共享的队列jqueue = JoinableQueue()#用于获取队列的方法def _get_queue():    return jqueue#启动server(manager)def make_server(port, authkey):    # 注册rpc,获取共享的队列(的代理,这段代码的实现很有意思,建议看看源码)    TestManager.register("get_queue",callable=lambda : _get_queue())    # 如果要走tcp socket,就用这一行    #manager = TestManager(address=('', port), authkey=authkey)    # 如果要走unix socket,就用这一行    manager = TestManager(authkey=authkey)    # 启动server(manager)进程    manager.start()    return manager# consumer进程的入口def do_consume(manager):    print ("consumer pid %d" % os.getpid())    queue=manager.get_queue()    count = 0    while True:        item = queue.get(block=True)        #print(item)        #time.sleep(1)        queue.task_done()        if item is None:            conn = queue._tls.connection            break        count += 1    print ("done consuming %d" % count)#构造新的manager实例做为client连接manager serverdef make_client(address, authkey):    # 注册rpc,用于非父子进程环境时,新构造的manager识别rpc方法    TestManager.register("get_queue")    manager = TestManager(address=address, authkey=authkey)    manager.connect()    return manager# producer进程的入口def do_produce(address, authkey):    print ("producer pid %d" % os.getpid())    client=make_client(address, authkey)    queue=client.get_queue()    for i in range(10000):        queue.put(i, block=True)    print ("done producing")# terminator进程的入口def do_terminate(address, authkey):    client=make_client(address, authkey)    queue=client.get_queue()    queue.put(None, block=True)authkey = b'foo'manager=make_server(6666, authkey)address = manager._address# 查看manager的进程号print ("manager pid %d" % manager._process.ident)# 通过父子进程变量传递的方式,向consumer进程传递managerconsumer = Process(target=do_consume, args=(manager, ))consumer.start()# 伪造非父子进程传递address和authkey的方式,向producer进程传递连接manager需要的信息producer = Process(target=do_produce, args=(address, authkey))producer.start()# 查看当前的进程树status, output = subprocess.getstatusoutput('pstree -p %d' % pid)print (output) producer.join()# 伪造非父子进程传递address和authkey的方式,再启动一个terminator进程结束通信terminator = Process(target=do_terminate, args=(address, authkey))terminator.start()terminator.join()consumer.join()

  

 

以上示例代码的过程如下:

  1. 构造manager server;
  2. 构造一个consumer进程,直接从父进程获取到manager对象;
  3. 再构造一个producer进程,通过传递address和authkey,新构造manager client,并连接manager server;
  4. producer获取到共享队列,生产消息;
  5. consumer获取到共享队列,消费消息;
  6. terminator(producer)生产一个空消息;
  7. consumer获取到空消息,消费结束;

 

结果:

parent pid 30460manager pid 30461consumer pid 30463producer pid 30464python(30460)-+-pstree(30465)              |-python(30461)-+-{python}(30462)              |               |-{python}(30472)              |               |-{python}(30474)              |               `-{python}(30475)              |-python(30463)              `-python(30464)done producingdone consuming 10000

  

可以看到共生成4个子进程,一个manager server、一个consumer、一个producer、还有一个pstree查看进程树;

转载于:https://www.cnblogs.com/ZisZ/p/10770436.html

你可能感兴趣的文章
大数智能未来
查看>>
virtualenv和virtualenvwrapper 的安装和使用
查看>>
MAC sublime text 无法自动补齐标签
查看>>
NgBook留言本开发全过程(1)
查看>>
LeetCode-指针法
查看>>
Python之路,Day12 - 那就做个堡垒机吧
查看>>
linux之shell之if、while、for语句介绍
查看>>
Mysql phpStudy升级Mysql版本,流产了怎么办?
查看>>
SQLServer之数据库行锁
查看>>
OFDM仿真
查看>>
浅谈linux内核中内存分配函数
查看>>
走近SpringBoot
查看>>
thinkphp3.2.3分页
查看>>
python程序之profile分析
查看>>
写在读研初期
查看>>
开环增益对负反馈放大电路的影响
查看>>
MySQL-ERROR 2003
查看>>
SQL Server2012-SSIS的包管理和部署
查看>>
JavaScript内置对象
查看>>
如何把js的循环写成异步的
查看>>