27 July 2009

Scala里多线程的基础就是Actor,核心思想是用消息传递来进行线程间的信息共享和同步。

Scala的Actor线程模型可以这样理解:所有Actor共享一个线程池,总的线程个数可以配置,也可以根据CPU个数决定;当一个Actor启动之后,Scala分配一个线程给它使用,如果使用receive模型,这个线程就一直为该Actor所有,如果使用react模型,Scala执行完react方法后抛出异常,则该线程就可以被其它Actor使用。

下面看一些核心代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  def start(): Actor = synchronized {
    // Reset various flags.
    //
    // Note that we do *not* reset `trapExit`. The reason is that
    // users should be able to set the field in the constructor
    // and before `act` is called.
 
    exitReason = 'normal
    exiting = false
    shouldExit = false
 
    scheduler execute {
      ActorGC.newActor(Actor.this)
      (new Reaction(Actor.this)).run()
    }
 
    this
  }

其中Reaction实现Runnable接口,scheduler基本相当于是一个线程池,所以调用start方法之后会有一个线程来为该Actor服务。

使用receive模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  def receive[R](f: PartialFunction[Any, R]): R = {
    assert(Actor.self == this, "receive from channel belonging to other actor")
    this.synchronized {
      if (shouldExit) exit() // links
      val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
      if (null eq qel) {
        waitingFor = f.isDefinedAt
        isSuspended = true
        suspendActor()
      } else {
        received = Some(qel.msg)
        sessions = qel.session :: sessions
      }
      waitingFor = waitingForNone
      isSuspended = false
    }
    val result = f(received.get)
    sessions = sessions.tail
    result
  }

如果当前mailbox里面没有可以处理的消息,调用suspendActor,该方法会调用wait;如果有消息,这调用PartialFunction进行处理。

使用react模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  def react(f: PartialFunction[Any, Unit]): Nothing = {
    assert(Actor.self == this, "react on channel belonging to other actor")
    this.synchronized {
      if (shouldExit) exit() // links
      val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
      if (null eq qel) {
        waitingFor = f.isDefinedAt
        continuation = f
        isDetached = true
      } else {
        sessions = List(qel.session)
        scheduleActor(f, qel.msg)
      }
      throw new SuspendActorException
    }
  }

如果当前mailbox没有可以处理的消息,设置waitingFor和continuation,这两个变量会在接收到消息的时候使用;如果有消息,则调用scheduleActor,该方法会在线程池里选择一个新的线程来处理,具体的处理方法也是由PartialFunction决定。不管是哪条路径,react都会立即返回,或者说是立即抛出异常,结束该线程的执行,这样该线程就可以被其它Actor使用。

再来看看接收消息的处理代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
    if (waitingFor(msg)) {
      received = Some(msg)
 
      if (isSuspended)
        sessions = replyTo :: sessions
      else
        sessions = List(replyTo)
 
      waitingFor = waitingForNone
 
      if (!onTimeout.isEmpty) {
        onTimeout.get.cancel()
        onTimeout = None
      }
 
      if (isSuspended)
        resumeActor()
      else // assert continuation != null
        scheduler.execute(new Reaction(this, continuation, msg))
    } else {
      mailbox.append(msg, replyTo)
    }

}
如果当前没有在等待消息或者接收到的消息不能处理,就丢到mailbox里去;相反,则进行消息的处理。这里对于receive模型和react模型就有了分支:如果isSuspended为true,表示是receive模型,并且线程在wait,就调用resumeActor,该方法会调用notify;否则就是react模型,同样在线程池里选择一个线程进行处理。



blog comments powered by Disqus