skynet lua层消息的执行顺序
前几篇讲解了服务之间互相发送消息,消息的挂起,恢复,fork等等,感觉还是有点搞不清他们之间是怎么协作的,例如fork产生的协程什么时候被调用,如果有多个fork又怎么被调用.这篇试着讲解一下.
首先要明白的是,一般所有的lua层函数都是以协程的方式被执行的,包括fork产生的函数.除非你在skynet.start()之外调用函数.这点通过前面的分析可以知道. start()函数调用timeout产生协程. fork产生协程列表.
我们知道,lua层设置的回调函数为skynet.dispatch_message. 他主要调用raw_dispatch_message. 在那里才是驱动协程函数执行的地方.一个协程结束或挂起之后将由suspend函数来接管,相信通过前面的讲解,大家对这个函数不陌生了.
如果入口函数start没有调用fork,sleep,wait之类的函数,那么驱动start()执行的消息将结束.看看我们的协程池函数:
local function co_create(f) local co=table.remove(coroutine_pool) if co==nil then co=coroutine.create(function(...) f(...) --① 函数执行完毕 while true do f=nil coroutine_pool[#coroutine_pool+1]=co f=coroutine_yield "EXIT" --②挂起协程 f(coroutine_yield()) end end) else coroutine_resume(co, f) --③ end return coend
在①处函数执行完毕,然后进入②,协程挂起. 将由suspend函数接管.执行cmd=='EXIT'分支.
当再次收到消息时(注意,一般再次收到消息是其他服务的call调用,类型不为response),调用skynet.dispatch_message时,参看前面raw_dispatch_message的代码,如果消息类型不为response,那么将再次调用co_create().此时将执行3??,回到前次消息挂起的地方,返回的f就是想要执行的、在dispatch中设置的回调函数。之后协程再次挂起,然后又恢复,直至suspend接管。
如果有skynet.sleep(),那么会产生一个sessionID并被挂起,suspend函数接管,用session_id_coroutine关联session和当前协程。等到时间到会产生resond类型的消息,根据sesssionID找到协程,恢复协程,此时start(func)中的func函数才算结束,即上面的①。
我们注意到suspend函数的'SLEEP'分支还有一个sleep_session表,他是用了干嘛的呢?
查找skynet.lua源码,发现skynet.wakeup引用到他了。在介绍skynet.wakeup()之前,先说说skynet.fork()。
skynet.fork()也会调用协程池函数co_create()来产生协程。那么调用这个函数时,会走③的流程,然后coroutine_resume(co)吗,那样岂不是乱套了。不会的!因为这个时候当前函数还没有结束呢,coroutine_pool仍然为空,走的还是if分支。所以他只是创建一个协程,插入到fork_queue表中,并没有启用协程。
我们追踪fork_queue,发现是在raw_dispatch_message结束后调用的,而且是顺序取出协程,并执行。raw_dispatch_message调用结束,可能是入口函数执行完毕,调用了yield('EXIT'),也有可能执行了sleep,调用了yield('SLEEP'),这都会导致suspend返回。
这里我们可以回到开头提出的问题了。skynet.fork()产生的的协程函数,将在主协程(start()启用的)结束或挂起时被调用。有多个fork产生的协程函数时,将按顺序挨个取出执行,全部执行完毕时,skynet.dispatch_message才算结束,下个消息才得以到达。在c接口层,同一个服务的消息队列是严格按照顺序来执行的,所以这样没有问题。
再次强调,lua协程只是模拟了多线程的执行,并不会真正有多个协程在执行,所以确保skynet.fork()函数中不要出现死循环或者很费时的操作,因为他不会像linux一样真正多线程的执行。
填一下上面挖的坑。skynet.wakeup是用来干嘛的呢?
我们想提前唤醒执行sleep函数的协程可以吗。当然可以,skynet.wakeup就是来干这个的。sleep后,何时去执行skynet.wakeup,毕竟sleep挂起协程之后要等到超时才会重新恢复,既然恢复之后,再调用wakeup还有什么用呢。你可能想到了,在skynet.fork()产生的协程中调用。前面说到了,sleep之后就可以执行fork里的协程了。skynet.wakeup是怎么做到的呢?
看看skynet.wakeup的实现:
function skynet.wakeup(co) if sleep_session[co] then table.insert(wakeup_queue, co) return true endend
他只是根据sleep_session表来产生一个新的表wakeup_queue。
如果要唤醒的协程没有在sleep_session表中,则不会有任何结果。前面说到sleep()函数会让suspend产生sleep_session表来关联要挂起的协程。所以waitup一个sleep的协程会插入到wakeup_queue表中。而wakeup_queue是在disptch_wakeup中取出并恢复的。disptch_wakeup仍然是在suspend中最后调用的,也就是说一个协程被挂起或结束后会去检查wakeup_queue列表。
disptch_wakeup的实现为:
local function dispatch_wakeup() local co=table.remove(wakeup_queue,1) if co then local session=sleep_session[co] if session then session_id_coroutine[session]="BREAK" return suspend(co, coroutine_resume(co, false, "BREAK")) end endend
注意,即使调用了wakeup,也有可能不会立马唤醒那个协程,因为wakeup只是插入了一个挂起的协程,真正要到disptch_wakeup中去恢复,如果wakeup后面有个很费时的操作,那么将等到他执行完毕,才会在suspend最后去调用disptch_wakeup。
这下我们明白了sleep_session的作用。然而我们发现在skynet.wait()函数里面也有这个表,部分代码也与skynet.sleep()类似。
不难猜到skynet.wait()的作用:挂起指定的或者正在运行的协程。由于他不会超时,所以必须由skynet.wakeup()来唤醒。我们可以用这组函数来实现资源的同步。
有个有趣的问题是,sleep之后,被wakeup,sleep超时之后仍然会收到一条response类型的消息,只不过sessionID对应的协程在dispatch_wakeup中已经置为'BREAK'了,他不会再做任何操作了。
最后附一张图,来说明上面的过程:
skynet lua层消息的执行顺序.png至此,skynet lua层中基本所有的消息流程就分析到了。周日花了差不多一下午写的,坐着很累了,如有遗漏的地方下次再补充。
“
本文主要介绍skynet的设计理念和特点,对于具体实现细节暂不展开。
”
skynet是什么?
“
skynet 是一个为网络游戏服务器设计的轻量框架。但它本身并没有任何为网络游戏业务而特别设计的部分,所以尽可以把它用于其它领域。"
”
skynet设计初衷
“
作为服务器,通常需要同时处理多份类似的业务。例如在网络游戏中,你需要同时向数千个用户提供服务;同时运作上百个副本,计算副本中的战斗、让 NPC 通过 AI 工作起来,等等。在单核年代,我们通常在 CPU 上轮流处理这些业务,给用户造成并行的假象。而现代计算机,则可以配置多达数十个核心,如何充分利用它们并行运作数千个相互独立的业务,是设计 skynet 的初衷。
”
并发编程模型解决方案
因此,skynet提供了一种多核并发的解决方案,充分利用了多核优势。
常见的多核并发解决方案有:多进程,多线程,csp模型,actor模型。接下来简单介绍和对比这四种并发模型。
多进程并发模型
并发实体:进程
进程间通信方式:socket,共享内存,管道,信号量,unix域等。
优点:隔离性好,因为每个进程都有自己独立的进程空间
缺点:统一性差,即数据同步比较麻烦;解决方案(消息队列zeromq解决最终一致性问题,rpc解决强一致性问题,zookeeper解决服务协调的问题)
多线程并发模型
并发实体:线程
线程间通信方式:消息队列,管道,锁等
优点:统一性强,因为线程都在同一个进程内(这里的多线程是指同一进程内的多线程)
缺点:隔离性差,线程间共享了很多资源,并且可以轻易的访问其他线程的私有空间,需要使用锁来进行控制。(锁的类型选择和粒度控制都是比较难的)
csp并发模型
描述两个独立的并发实体通过**共享的通讯 channel(管道)**进行通信的并发模型。
Golang 借用CSP模型仅仅是借用了 process和channel这两个概念来实现自己的并发模型,process是在go语言上的表现就是 goroutine ,也是go并发执行的实体,每个实体之间是通过channel通讯来实现数据共享。(可理解为加强版多线程解决方案)
actor模型
并发实体当然是actor。那么actor是什么呢?其实actor是从语言层面抽象出来的进程概念,erlang是从语言层面来实现actor模型。(可理解为加强版多进程解决方案)
actor模型有以下特点:
用于并行计算actor是最基本的计算单元基于消息计算actor之间相互隔离,通过消息进行沟通那么skynet也采用了actorr模型,不过,不同于erlang,skynet是通过框架来实现actor模型。skynet使用内存块和lua虚拟机来进行环境隔离,actor之间通过消息队列进行沟通,通过指针传递即可达到通信目的。
那么actor模型有哪些优势呢?我们可以启动上千万个actor并发实体,而进程/线程模型中并发实体个数是有限的。
skynet中actor的隔离与通信
其实actor就是skynet中的服务,服务分为c服务****和lua服务(比如,main.lua就是一个actor),actor的结构组成如下:
隔离环境,内存块或lua虚拟机回调函数,用于执行actor,消费消息消息队列,用于存储消息隔离
对于c服务隔离环境为内存块,lua服务隔离环境为lua虚拟机。
//service_logger.c//c服务隔离环境为内存块structlogger{FILE*handle;char*filename;uint32_tstarttime;intclose;};//service_snlua.c//lua服务隔离环境为lua虚拟机structsnlua{lua_State*L;structskynet_context*ctx;size_tmem;size_tmem_report;size_tmem_limit;lua_State*activeL;volatileinttrap;};//skynet_server.c//context上下文隔离环境structskynet_context{void*instance;structskynet_module*mod;void*cb_ud;skynet_cbcb;structmessage_queue*queue;...};
lua一般用来做业务开发(lua服务),c一般实现底层框架以及一些计算密集型的业务(c服务)。**可以将skynet理解为一个简单的操作系统,可以用来调度数千个lua虚拟机(进程),让他们并行工作。**每个lua虚拟机都可以接收其他虚拟机发送过来的消息,以及对其他虚拟机发送消息。
通信
skynet中actor的运行和通信都通过消息来驱动:
全局消息队列:存储有消息的actor消息队列指针actor消息队列:存储专属actor的消息队列如下图:
skynet_msg
工作流程:
从全局消息队列中取出actor消息队列,(这一步需要加锁,采用自旋锁,尽可能不让worker线程休眠,榨干cpu)从actor消息队列中取出消息,并通过回调函数处理(消费actor中的消息);因此不用担心一个服务同时被多个线程处理,即单个服务的执行,不存在并发,也即线程安全。如果actor消息队列还有消息,将actor消息队列放入全局消息队列的队尾,起到公平调度。消息生产方式主要为:
actor之间通信产生;网络中产生定时器产生消息的消费方式只有一种:,通过回调函数进行消费。
因为actor之间通信直接通过指针传递,因此服务间的通信非常高效。
注意:actor之间发送消息是不需要唤醒worker条件变量的,因为actor之间发送消息,则至少有一个worker线程在工作。
skynet每个服务均有一个协程池,lua服务收到消息时,会优先去池子里取一个协程出来,即,就视为收到一个消息,就创建一个协程吧
skynet中的线程
timer线程:运行定时器socket线程,进行网络数据的收发worker线程:负责对消息队列进行调度monitor线程:用于检测节点内的消息是否堵住线程创建
//skynet_start.c//skynet启动是会创建以上四种线程staticvoidstart(intthread){...create_thread(&pid[0],thread_monitor,m);//monitor线程create_thread(&pid[1],thread_timer,m);//timer线程create_thread(&pid[2],thread_socket,m);//socket线程//根据权重创建worker线程staticintweight[]={-1,-1,-1,-1,0,0,0,0,1,1,1,1,1,1,1,1,2,2,2,2,2,2,2,2,3,3,3,3,3,3,3,3,};structworker_parmwp[thread];for(i=0;i<thread;i++){wp[i].m=m;wp[i].id=i;if(i<sizeof(weight)/sizeof(weight[0])){wp[i].weight=weight[i];}else{wp[i].weight=0;}create_thread(&pid[i+3],thread_worker,&wp[i]);}}
线程间使用管道进行通信。其中socket线程和worker线程通过pipe进行通信。
服务模块将数据,通过socket发送给客户端时,并非将数据写入消息队列,通过pipe从worker线程发送给socket线程,并交由socket转发。
skynet作为游戏服务器时,我们编写的不同的业务逻辑,独立运行在不同的上下文环境,并且能通过某种方式,相互协作,共同服务于玩家。
skynet 业务是由lua来开发,与底层沟通以及计算密集的都需要用c。
skynet向epoll进行注册:connected, clients, listened, pipe读端(worker线程往管道写端写数据,socket线程在管道读端读数据)
skynet中内存分配采用jemalloc。
以上,是为一个初学者对skynet的理解。
参考
skynet Wiki:https://github.com/cloudwu/skynet/wiki
云风BLOG:https://blog.codingnow.com/2012/09/the_design_of_skynet.html
skynet源码欣赏:https://manistein.github.io/blog/post/server/skynet/skynet%E6%BA%90%E7%A0%81%E8%B5%8F%E6%9E%90/
Golang CSP:https://www.jianshu.com/p/36e246c6153d
发表评论