诗号:半神半圣亦半仙,全儒全道是全贤,脑中真书藏万卷,掌握文武半边天。

本文将从源码分析 React 中的 packages/scheduler 调度器的实现和应该(vue schduler)。

入局点: scheduler/src/__tests__/Scheduler-test.js

先从官方测试看如何开始 scheduler.

第一个 Scheduler-test.js 用到的函数: scheduleCallback(prioprity, callback, option)

任务调试的入口函数,找到了第一个目标接下来就是沿着这个引线一步步去展开,从而了解 整个 Scheduler 实现过程。

/img/react/scheduler.svg

声明:

  1. 本文都是从 MessageChannel 角度去分析任务执行,通过管道信号给 performWorkUntilDeadline 执行 flushWork

  2. 只针对 Scheduler.js 和 ScheduleMinHeap.js 去了分析,因为最主要的代码都在这两 个里面,其它的要么是测试要么是已经废弃的。

重要知识点

任务队列

Scheduler 涉及到两个任务队列: taskQueue, timerQueue, 从目前来看,前者是已经过期 了的任务队列,后者是有延迟的(将来的)任务队列。

当任务的 startTime > currentTime 的时候会被加入到 timerQueue,等待延迟处理,否则 会被加入到 taskQueue,一旦有空闲就会优先被处理的任务。

配套的函数:

requestHostTimeout <-> cancelHostTimeout: 用来执行延迟任务的函数,request 发起执 行,cancel 取消执行,request 发起之后 callback 不一定就会被执行,有可能会被新进 来的且任务优先级更高或更早的任务插队,因为在 scheduleCallback 中会在检测有任务已 经发起执行的时候,先调用 cancel 取消计时器,阻止上一个任务的执行(前提是这个新任 务必须要比这个正在执行的更早: peek(timerQueue), 且 taskQueue 里面还不能有其它任 务,因为它的时间是比 currentTime 还小的时间(过期了都,还不让我先走???))。

requestHostCallback: 用来发起 taskQueue 执行的函数。

重要标记(全局变量)

下表,列出了 Scheduler 中用到的所以关键的全局变量(包括一些重要函数),以及它们用 途,用在了哪个函数中。

  1. isMessageLoopRunning, boolean: 用来控制管道是不是能继续接受信号,这决定了

performWorkUntilDeadline 能不能被执行(flushWork)。

关联函数: performWorkUntilDeadline,requestHostCallback

  1. taskIdCounter: number, 任务的唯一 ID ,每个 callback 都会被封装一层,里面就包 含一个 id 由这个 taskIdCounter++ 得到

    关联函数: scheduleCallback

  2. taskQueue, Array<Task>, 已经过了当前时间还有没被执行的任务,它们会在下一个空 闲时间优先执行

    关联函数: scheduleCallback

  3. timerQueue, Array<Task>, 还末过期的任务,将会以 ~setTimeout~(requestHostTimeout) 方式触发

  4. isHostTimeoutScheduled: 是不是有 timerQueue 中的任务计时器已经启动了,在 flushWork 中检测,如果有需要将其实取消(停止计时器),让 taskQueue 中的任务先行

    关联函数: scheduleCallback, requestHostTimeout, flushWork

  5. isHostCallbackScheduled, boolean: 是不是存在执行 taskQueue 中的任务,这将阻止 scheduleCallback 中 taskQueue 在 push 入列之后是不是可以直接触发任务执行(结合 isPerformingWork)

    关联函数: scheduleCallback, flushWork

  6. isMessageLoopRunning, boolean: 消息管道的开头,这个决定了 PORT2(schedulePerformWorkUntilDeadline) 是不是可以继续向 PORT1(performWorkUntilDeadline) 发送 null 消息去触发 performWorkUntilDeadline -> flushWork -> workLoop

    关联函数: requestHostCallback, performWorkUntilDeadline

  7. needsPaint, boolean: TODO

  8. scheduledHostCallback, function: 正在被处理的 callback, 其实就是 flushWork 函 数(调用 requestHostCallback(callback) 的 callback),在 scheduledHostCallback 中被调用之后重置为 null 去重新接受新的使命。

    关联函数: schedulePerformWorkUntilDeadline, requestHostCallback

  9. currentTask, object: 记录当前正在被处理的任务对象, 在 workLoop 中通过 peek(taskQueue) 得到,while 循环中会不断的 flush taskQueue。

    结构:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    {
      "id": taskIdCounter++,
      callback, // 任务函数
      startTime, // callback 被入列时的时间戳
      expirationTime, // 过期时间=startTime + (delay||0)
      // 任务优先级,总共有5种:
      // ImmediatePriority,
      // UserBlockingPriority,
      // IdlePriority,
      // LowPriority,
      // NormalPriority
      priorityLevel,
      sortIndex // 排序索引,值就是当前的 startTime(timerQueue) 或 expirationTime(taskQueue)
    }
    
  10. currentPriorityLevel, number: 当前任务的优先级,默认是 NormalPriority, flushWork 的时候实时更新它的值。

    关联函数: flushWork, workLoop

  11. yieldInterval, number, 5: TODO

  12. deadline, number, 5: TODO

  13. maxYieldInterval, number, 300: TODO

整体结构

在开始 scheduleCallback 之前,还是很有必要大致了解下 packages/scheduler 整个目录 的结构以及各个文件的作用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
╰─⠠⠵ tree -C .                                                                ~/github/react/react/packages/scheduler
./src
├── SchedulerFeatureFlags.js # 特性标记
├── SchedulerMinHeap.js # 管理Heap(堆)节点内存(含,push,peek,pop等函数)
├── SchedulerPriorities.js # 优先级的常量值
├── SchedulerProfiling.js # 日志相关一内容
├── __tests__ # Jest 测试用例
│   ├── Scheduler-test.js # 测试 scheduleCallback 函数
│   ├── SchedulerMock-test.js # SchedulerMock.js 中函数测试
│   ├── SchedulerPostTask-test.js # SchedulerPostTask.js 测试
│   ├── SchedulerProfiling-test.js # SchedulerProfiling.js 测试
│   ├── SchedulerSetImmediate-test.js # setImmediate 测试
│   ├── SchedulerSetTimeout-test.js # setTimeout 测试
│   └── SchedulerUMDBundle-test.internal.js # umd bundle 测试
└── forks
    ├── Scheduler.js # 主入口,scheduleCallback 就在这个里面
    ├── SchedulerFeatureFlags.www.js # scheduler 特性开头
    ├── SchedulerMock.js # 任务控制类内容(如:flushWork, workLoop, next等重要函数)
    └── SchedulerPostTask.js # runTask 在这里面

2 directories, 15 files

对应 js 中的一些重要函数和简介(此节过后会一个个来详细分析):

SchedulerMinHeap.js : 管理节点的存储,这里用的是栈的方式实现的,即节点会依据 FILO(先进后出)规则实施管理,相关函数。

  • siftUp(heap, node, i), 总是找前面集合的中间元素做参考元素来判断然后替换

  • siftDown(heap, node, i), 与 siftUp 相反,在 pop(heap, node) 且被调用,当取出第 一个(heap[0])之后,让 last 变成第一个然后执行 siftDown()

  • push(heap, node), 入栈,之后执行 siftUp(heap,node,oldLen)

  • pop(heap), 返回的是第一个节点(heap[0]),然后执行 siftDown(heap,node,0),

  • peek(heap), 总是返回第一个

SchedulerPriorities.js, 优先级常量

1
2
3
4
5
6
export const NoPriority = 0;
export const ImmediatePriority = 1;
export const UserBlockingPriority = 2;
export const NormalPriority = 3;
export const LowPriority = 4;
export const IdlePriority = 5;

SchedulerProfiling.js, 日志相关函数,包含: markTaskStart, markTaskCompleted, markTaskCanceled, markTaskErrored, markTaskRun, markTaskYield, markSchedulerSuspended, markSchedulerUnsuspended, 这些 markXxx 最后都是调用了 logEvent(entries)

Scheduler.js, scheduler 主要入口函数 scheduleCallback 就在这里以及其它的 callback 等其它处理函数,比如一核心函数(flushWork, workLoop, next,cancelCallBack)等等。

SchedulerPostTask.js, runTask(priorityLevel, postTaskPriority, node, callback) 函数实现。

切入正题 -> SchedulerMinHeap.js

WARNING

siftUp, siftDown 不是简单的排序操作,但是它们完成之后总是能保证 heap 的第一个任 务的 sortIndex 是最小的(时间戳, sortIndex 里保存的是当前任务被加入到队列时的时间 戳+它的delay)。

SchedulerMinHeap 节点栈管理(push,pop,peek)

SchedulerMinHeap.js 里面有六个函数:

push(heap, node) -> siftUp(heap, node, heapOldLength)

pop(heap) -> heap[0] -> siftDown(heap, node, 0)

peek(heap) -> heap[0]

compare(a, b) 比较两个节点,优先 node.sortIndex 然后 node.id

node.sortIndex 是任务入列时的时间戳(+delay, 如果有)。

Success

siftUp: 让新 push 的节点从队尾尽量的上浮,直到前面的数比它小就行。

siftDown: pop 之后,让heap中最后一个节点从第一个位置开始下沉,直到前面的数都比它 小就行。

好像这样也讲不通!!!

siftUp(heap,node,i)

sfitUp 会根据 node.sortIndex 和 node.id 将 heap 进行升序排序,先比 较 node.sortIndex, 如果 sortIndex 相同再比较 node.id

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
function siftUp(heap, node, i) {
  let index = i;
  while (index > 0) {
    const parentIndex = (index - 1) >>> 1
    const parent = heap[parentIndex]
    if (compare(parent, node) > 0) {
      // 找到比 node.id/sortIndex 更大的节点,然后交换
      heap[parentIndex] = node
      heap[index] = parent
      index = parentIndex
    } else {
      // 排序完成,没有更大的了
      return
    }
  }
}

function compare(a, b) {
  // 先比较 sort index 然后比较 task id
  const diff = a.sortIndex - b.sortIndex
  return diff !== 0 ? diff : a.id - b.id
}

function push(heap, node) {
  const index = heap.length
  heap.push(node)
  siftUp(heap, node, index)
}

const heap = [], vals = []

for (let i = 0 ; i < 10; i++) {
  const index = Math.floor(Math.random() * 10)
  if (!heap.find((val) => val.sortIndex === index)) {
    vals.push(index)
    push(heap, { sortIndex: index})
  }
}
console.log(vals, heap);
[
  9, 3, 1, 7,
  6, 5, 0, 8
] [
  { sortIndex: 0 },
  { sortIndex: 6 },
  { sortIndex: 1 },
  { sortIndex: 8 },
  { sortIndex: 7 },
  { sortIndex: 5 },
  { sortIndex: 3 },
  { sortIndex: 9 }
]

结果并不是按照一定顺序排列的,执行结果表:

parentIndex = (index - 1) >>> 2 等于是 Math.floor( index - 1 / 2 )

ivalindex/lenparentIndexcompareheap(省略对象)
090--[9]
1310,99 > 3[3,9]
2120,33 > 1[1,9,3]
3731,99 > 7[1,7,3,9]
10,11 < 7[1,7,3,9]
4641,77 > 6[1,6,3,9,7]
10,11 < 6[1,6,3,9,7]
5552,33 < 5[1,6,3,9,7,5]
20,11 < 5[1,6,3,9,7,5]
6062,33 > 0[1,6,0,9,7,5,3]
20,11 > 0[0,6,1,9,7,5,3]
7873,99 > 8[0,6,1,8,7,5,3,9]
31,66 < 8[0,6,1,8,7,5,3,9]

也就是说它总是会根据 index 去找其前面的所有元素的中间位置的元素来和新的 node 进 行比较,如果值比新的节点大就进行替换。

比如

i=1,val=3,heap=[9,3],target-heap=[9],target=9,替换之后=[3,9]

i=2,val=1,heap=[3,9,1],target-heap=[3,9],target=3,替换之后=[1,9,3]

i=3,val=7,heap=[1,9,3,7],target-heap=[1,9,3],target=9,替换之后=[1,7,3,9]

i=4,val=6,heap=[1,7,3,9,6],target-heap=[1,7,3,9],target=7,替换之后=[1,6,3,9,7]

依次类推到最后得到 [0,6,1,8,7,5,3,9]

siftDown(heap, node, i)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
function pop(heap) {
  if (heap.length === 0) {
    return null
  }

  const first = heap[0]
  const last = heap.pop()
  if (last !== first) {
    heap[0] = last
    siftDown(heap, last, 0)
  }
  return first
}

function siftDown(heap, node, i) {
  let index = i
  const length = heap.length
  const halfLength = length >>> 1
  while (index < halfLength) {
    const leftIndex = (index + 1) * 2 - 1
    const left = heap[leftIndex]
    const rightIndex = leftIndex + 1
    const right = heap[rightIndex]

    if (compare(left, node) < 0) {
      if (rightIndex < length && compare(right, left) < 0) {
        heap[index] = right
        heap[rightIndex] = node
        index = rightIndex
      } else {
        heap[index] = left
        heap[leftIndex] = node
        index = leftIndex
      }
    } else if (rightIndex < length && compare(right, node) < 0) {
      heap[index] = right;
      heap[rightIndex] = node;
      index = rightIndex;

    } else {
      // Neither child is smaller. Exit.
      return
    }
  }
}

结合 sitUp 和 push 来测试:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
const {siftUp, push, siftDown, pop} = require(process.env.BLOG_JS + '/react/pkgs/scheduler.js')

// 先塞一些节点到 heap
const heap = [],vals = []

for (let i = 0 ; i < 10; i++) {
  const index = Math.floor(Math.random() * 10)
  if (!heap.find((val) => val.sortIndex === index)) {
    vals.push(index)
    push(heap, { sortIndex: index})
  }
}
console.log(vals)
console.log('push', heap);

// 然后用 pop 取第一个
const node = pop(heap)

console.log('pop', node, '\n', heap)
[
  9, 5, 4, 2,
  0, 1, 3
]
push [
  { sortIndex: 0 },
  { sortIndex: 2 },
  { sortIndex: 1 },
  { sortIndex: 9 },
  { sortIndex: 4 },
  { sortIndex: 5 },
  { sortIndex: 3 }
]
pop >>  { first: { sortIndex: 0 }, last: { sortIndex: 3 } }
pop { sortIndex: 0 }
 [
  { sortIndex: 1 },
  { sortIndex: 2 },
  { sortIndex: 3 },
  { sortIndex: 9 },
  { sortIndex: 4 },
  { sortIndex: 5 }
]

根据上面的示例来分析下整个过程:

pop(heap, node) -> heap[0] -> heap[0] = last -> siftDown(heap, node, 0)

当前 heap = [0, 2, 1, 9, 4, 5, 3],

pop first = 0,

last=3 -> first

-> heap=[3,2,1,9,4,5], node=3

indexhalfleft[Index]right[Index]left<noderight<leftright<nodeheap
031,22,12 < 3, true1<2, true-[1,2,3,9,4,5]
133,94,43 < 3, false-4<3,false[1,2,3,9,4,5]

经过两次 while(index < halfLength) 后结束,得到 [1,2,3,9,4,5]

  1. left, right 是两个相邻的节点(right=left+1)

  2. 先比较 left<node ? right<node -> right与node替换 : left与node替换

  3. 如果 left>node 比较 right<node -> right与node替换

scheduleCallback(priorityLevel,callback,options)

  1. startTime, 入列起始时间戳,如果 options.delay > 0 用当前时间戳加上delay

  2. timeout, 根据 priorityLevel 设置对应的优先级值,共有五种优先级

    ImmediatePriority, timeout=-1

    UserBlockingPriority, timeout=250

    IdlePriority, timeout=Math.pow(2,30)-1=1073741823

    LowPriority, timeout=10000

    NormalPriority, timeout=5000

  3. 过期时间 expirationTime = startTime + timeout

  4. 封装 newTask = {id, callback, priorityLevel, startTime, expirationTime, sortIndex}

  5. 检查 startTime > currentTime ,是不是入列的时间已经过了当下时间,如果过了要做延时处理, 使用 expirationTime 做 sortIndex,否则直接用 startTime 做 sortIndex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
function scheduleCallback(priorityLevel, callback, options) {
  var currentTime = getCurrentTime()

  var startTime // 任务执行的开始时间
  if (typeof options === 'object' && options !== null) {
    var delay = options.delay
    if (typeof delay === 'number' && delay > 0) {
      startTime = currentTime + delay
    } else {
      startTime = currentTime
    }
  } else {
    startTime = currentTime
  }

  var timeout // 根据优化级设置超时时间
  switch (priorityLevel) {
    case ImmediatePriority:
      timeout = -1
      break
    case UserBlockingPriority:
      timeout = 250
      break
    case IdlePriority:
      // Max 31 bit integer. The max integer size in V8 for 32-bit systems.
      // Math.pow(2, 30) - 1
      // 0b111111111111111111111111111111
      timeout = 1073741823
      break
    case LowPriority:
      timeout = 10000
      break
    case NormalPriority:
      timeout = 5000
      break
  }

  // 过期时间
  var expirationTime = startTime + timeout

  // 封装新任务
  var newTask = {
    id: taskIdCounter++,
    callback,
    priorityLevel,
    startTime,
    expirationTime,
    sortIndex: -1
  }

  if (startTime > currentTime) {
    // 延迟的任务,应该进入队列排队,用肇始时间做索引
    newTask.sortIndex = startTime
    push(timerQueue, newTask)
    // peek 取队列中第一个任务 queue[0]
    if (peek(taskQueue) === null && newTask === peek(timerQueue)) {
      // queue: [null, newTask] 情况
      // 所有的任务还在排队中,且当前的 newTask 就是最早过期的那个
      if (isHostTimeoutScheduled) {
        cancelHostTimeout()
      } else {
        isHostTimeoutScheduled = true
      }
      requestHostTimeout(handleTimeout, startTime - currentTime)
    }
  } else {
    newTask.sortIndex = expirationTime
    push(taskQueue, newTask)
    // Schedule a host callback, if needed. If we're already performing work,
    // wait until the next time we yield.
    if (!isHostCallbackScheduled && !isPerformingWork) {
      isHostCallbackScheduled = true
      requestHostCallback(flushWork)
    }
  }

  return newTask
}

这里用到了几个函数: cancelHostTimeout, requestHostCallback, requestHostTimeout, 它们又分别是是做什么了?

cancelHostTimeout()

scheduleCallback 中执行这个时机是, startTime > currentTime 时,且 taskQueue 中 没有了任务,且 newTask 正好是 timerQueue 中最早的那个。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// peek 取队列中第一个任务 queue[0]
if (peek(taskQueue) === null && newTask === peek(timerQueue)) {
  // queue: [null, newTask] 情况
  // 所有的任务还在排队中,且当前的 newTask 就是最早过期的那个
  if (isHostTimeoutScheduled) {
    cancelHostTimeout()
  } else {
    isHostTimeoutScheduled = true
  }
  requestHostTimeout(handleTimeout, startTime - currentTime)
}

清除计时器:

1
2
3
4
5
6
let taskTimeoutID = -1;

function cancelHostTimeout() {
  clearTimeout(taskTimeoutID)
  taskTimeoutID = -1
}

taskTimeoutID 这个又是哪里用了?

正是 requestHostTimeout 中的计时器 ID。

requestHostTimeout()

启动一个计时器去执行 callback

1
2
3
4
5
function requestHostTimeout(callback, ms) {
  taskTimeoutID = setTimeout(() => {
    callback(getCurrentTime());
  }, ms);
}

requestHostCallback()

-> schedulePerformWorkUntilDeadline()

1
2
3
4
5
6
7
function requestHostCallback(callback) {
  scheduledHostCallback = callback;
  if (!isMessageLoopRunning) {
    isMessageLoopRunning = true;
    schedulePerformWorkUntilDeadline();
  }
}

用 scheduledHostCallback 来保存当前正在执行的任务(work),它实际是一个对 flushWork() 函数的引用,因为 requestHostCallback(flushWork) 传入的参数是 flushWork 这个函数,它是用来 flush 当前队列中任务的(work),后面会讲到。

isMessageLoopRunning: 标记正在 flush 队列中的任务。

schedulePerformWorkUntilDeadline

这是个发起任务执行的函数,并且这个函数根据环境的不同,使用的方案不一,主要有三种 情况(这里直接使用 MessageChannel 方案,这也是为何要先去简要的学习了下它的原因)。

  1. Node.js 和 IE 环境:使用 setImmediate

  2. MessageChannel, 消息通道

  3. 最后方案是 setTimeout,由于 4ms 问题所以比 MessageChannel 优先级低

下面的实现做了简化:

1
2
3
4
5
6
7
8
9
// 省略环境的检查,直接使用 DOM 和 Worker 环境,注释中说更
// 偏向用 MessageChannel 是因为 setTimeout 4ms 的问题
// 原本的检查优化级: setImmediate > MessageChannel > setTimeout
let schedulePerformWorkUntilDeadline = (() => {
  const channel = new MessageChannel()
  const port = channel.port2
  channel.port1.onmessage = performWorkUntilDealine
  return () => port.postMessage(null)
})()

这等于是说 schedulePerformWorkUntilDeadline 其实是一个管道的一个端口 port2,每次 调用都会往 port1 发送一条含 null 信息的消息,其目的就是去触发 performWorkUntilDealine 函数执行(更多有关 MessageChannel)。

管道特征是你发送一条它就会接受一条,是一个典型的 FIFO 的队列模型,下面可以做个简 单的测试:

PORT2->PORT1发送消息 {{i}} 清空消息

完整版本:

localSetImmediate 就是 setImmediate

localSetTimeout 就是 setTimeout

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
let schedulePerformWorkUntilDeadline;
if (typeof localSetImmediate === 'function') {
  // Node.js and old IE.
  // There's a few reasons for why we prefer setImmediate.
  //
  // Unlike MessageChannel, it doesn't prevent a Node.js process from exiting.
  // (Even though this is a DOM fork of the Scheduler, you could get here
  // with a mix of Node.js 15+, which has a MessageChannel, and jsdom.)
  // https://github.com/facebook/react/issues/20756
  //
  // But also, it runs earlier which is the semantic we want.
  // If other browsers ever implement it, it's better to use it.
  // Although both of these would be inferior to native scheduling.
  schedulePerformWorkUntilDeadline = () => {
    localSetImmediate(performWorkUntilDeadline);
  };
} else if (typeof MessageChannel !== 'undefined') {
  // DOM and Worker environments.
  // We prefer MessageChannel because of the 4ms setTimeout clamping.
  const channel = new MessageChannel();
  const port = channel.port2;
  channel.port1.onmessage = performWorkUntilDeadline;
  schedulePerformWorkUntilDeadline = () => {
    port.postMessage(null);
  };
} else {
  // We should only fallback here in non-browser environments.
  schedulePerformWorkUntilDeadline = () => {
    localSetTimeout(performWorkUntilDeadline, 0);
  };
}

performWorkUntilDeadline

这个函数是管道方式, schedulePerformWorkUntilDeadline 做为 channel.port2 发出信 号给做为另一端 channel.port1 的 performWorkUntilDeadline 去执行。

而这个函数里面的工作其实已经执行 scheduledHostCallback 也就是传递给 requestHostCallback(flushWork)flushWork 这个函数,而这个函数里又做了什么?

scheduledHostCallback === flushWork

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
const performWorkUntilDeadline = () => {
  if (scheduledHostCallback !== null) {
    const currentTime = getCurrentTime();
    // Yield after `yieldInterval` ms, regardless of where we are in the vsync
    // cycle. This means there's always time remaining at the beginning of
    // the message event.
    deadline = currentTime + yieldInterval;
    const hasTimeRemaining = true;

    // If a scheduler task throws, exit the current browser task so the
    // error can be observed.
    //
    // Intentionally not using a try-catch, since that makes some debugging
    // techniques harder. Instead, if `scheduledHostCallback` errors, then
    // `hasMoreWork` will remain true, and we'll continue the work loop.
    let hasMoreWork = true;
    try {
      hasMoreWork = scheduledHostCallback(hasTimeRemaining, currentTime);
    } finally {
      if (hasMoreWork) {
        // If there's more work, schedule the next message event at the end
        // of the preceding one.
        schedulePerformWorkUntilDeadline();
      } else {
        isMessageLoopRunning = false;
        scheduledHostCallback = null;
      }
    }
  } else {
    isMessageLoopRunning = false;
  }
  // Yielding to the browser will give it a chance to paint, so we can
  // reset this.
  needsPaint = false;
}

这个函数里有几个要点:

  1. deadline, 这个用来标记截止时间,时间一到会停止管道消息,这是个时间戳值(deadline = currentTime + yieldInterval;)

    let yieldInterval = 5; 初始值是 5ms,也就是在这 5ms 时间内能做的尽量去做?

  2. 注意这里使用的是 try…finally 而不是 try…catch 因为它不仅仅只是处理错误情 况

    而是不论当前的 work 执行结果是正常还是异常都要做一些后续或者清理工作,比如: 重置 isMessageLoopRunning=false 好让管道能继续接受信号,否则管道等于是 channel.port1 端永远不会有新的信号进来。

    isMessageLoopRunning 是管道能否继续接受信号的开关。

flushWork(hasTimeRemaining, initialTime)

flushWork 工作:

  1. 调用 workLoop(hasTimeRemaining, initialTime) flush taskQueue 队列中的任务

  2. 重置 isHostCallbackScheduled=false 标记,让 scheduleCallback 中在 taskQueue 入列的同时能启动 flushWork 执行去 flush tasks

  3. 检查 isHostTimeoutScheduled 是不是有 timerQueue 中的任务已经启动了,如果是则 取消它的执行,让当前的 taskQueue 先执行

  4. 在执行之前设置 isPerformingWork=true 标记已经有任务在执行了,阻止 scheduleCallback 中 taskQueue 的任务启动(结合 isHostCallbackScheduled)

  5. try…finally 去执行 workLoop(hasTimeRemaining, initialTime) 同样要做清理工作, 重置 isPerformingWork=false 标记当前工作已经完成了,可以触发新的 taskQueue 执 行了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
function flushWork(hasTimeRemaining, initialTime) {

  isHostCallbackScheduled = false
  if (isHostTimeoutScheduled) {
    // 如果此时有一个未来时间的任务存在计时中,要取消它,先执行 host callback
    isHostTimeoutScheduled = false
    cancelHostTimeout()
  }

  isPerformingWork = true
  const previousPriorityLevel = currentPriorityLevel
  try {
    return workLoop(hasTimeRemaining, initialTime)
  } finally {
    // 清理工作
    currentTask = null
    currentPriorityLevel = previousPriorityLevel
    isPerformingWork = false
  }
}

这里还分别用 currentTaskcurrentPriorityLevel 记录了当前任务及其优先级。

workLoop 执行完了 finally 里面做些清理工作。

TIP

flushWork 执行的是 taskQueue 中的任务,timerQueue 中的任务在 scheduleCallback 中 push 的时候有条件时就会触发(计时器延时方式触发)。

workLoop(hasTimeRemaining, initialTime)

简化版本(省略 while 循环中的代码):

  1. 通过一个 while 循环去处理 taskQueue 中的任务

  2. 如果 while 退出之后,发现还有任务(currentTask !== null) 直接返回 false 重新走 管道消息的流程,回到这里的 while 去处理该 task

  3. 如果 taskQueue 中没有了任务,那接下来要去触发 timerQueue 中的任务了 (setTimeout方式触发),同时返回 false 标记当次已经完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
function workLoop(hasTimeRemaining, initialTime) {
  let currentTime = initialTime
  advanceTimers(currentTime)
  // 取出队列中第一个任务 taskQueue[0]
  currentTask = peek(taskQueue)
  while (currentTask !== null/*省略debug的条件*/) {
    // TODO
  }
  // 不管有没任务都退出
  if (currentTask !== null) {
    return true
  } else {
    // 到这里说明 taskQueue 清空了,该到 timerQueue 中的任务了
    const firstTimer = peek(timerQueue)
    if (firstTime !== null) {
      requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime)
    }
    return false
  }
}

注意 workLoop 的返回值标示着当前空闲时间内有没有更多的任务需要去执行,这个体现在 performWorkUntilDeadline 函数中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
let hasMoreWork = true
try {
  hasMoreWork = scheduledHostCallback(hasTimeRemaining, currentTime)
} finally {
  if (hasMoreWork) {
    // 无论如何都要执行,看是不是有更多的任务待处理
    schedulePerformWorkUntilDeadline()
  } else {
    // 完成了一轮
    isMessageLoopRunning = false
    // 准备接受下一个 flushWork
    scheduledHostCallback = null
  }
}

如果有会重新调用 schedulePerformWorkUntilDeadline() 即 PORT1 向 PORT2 发送一个 null 信号,重新走 performWorkUntilDeadline() 流程,直到 workLoop 中返回 false 为 至。

TIP

也就是说管道一旦接受到了信号开始就会一直重复接受信号的流程,直到没有要处理的任务之后 结束,也就是 workLoop 返回 false, hasMoreWork 为 false 的时候。

那为什么 while 循环结束了后面的 currentTask 值不会是 null ?

workLoop 完整版本(while循环):

  1. while 中限制了只有 currentTask.expirationTime > currentTime 且有足够的时间执 行的时候才会继续下去,否则直接退出 while(这里就是上面问题的答案)

  2. 当时间充足时, callback 不是函数会直接被丢弃掉(pop(taskQueue))

  3. 当 callback 是函数时会被执行得到其结果,也就是 callback() 执行后的返回值 continuationCallback

  4. 当 continuationCallback 也是一个函数时会继续 while 循环来执行这个 continuationCallback, 注意这个时候的任务 currentTask 还在 taskQueue 中,并且 依旧是在 currentTask 这个任务循环中

  5. 经过 4 之后此时的 task 还是最开始 callback 对应的 currentTask, 只不过它的此 时 currentTask.callback 已经是 continuationCallback 了,所以这一步执行的 currentTask.callback() 实际上已经是 continuationCallback() 直到 callback() 返 回值不是函数为止

    例如:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    var fn1 = () => {/*1*/}, fn2 = () => fn1, fn3 = () => fn2
    var callback = () => {/*...*/ return fn3}
    
    newTask = { ..., callback, ... }
    
    // 进入 while
    currentTask = newTask
    c = currentTask.callback() // -> fn3 -> fn2 -> fn1
    // -> 继续 while 循环, c 的值会是, currentTask 此时依旧是那个 newTask
    // c = fn3
    // c = fn2
    // c = fn1
    // c = undefined
    // 到此结束 currentTask
    // pop currentTask
    // 取下一个 task -> while
    
  6. 当 continuationCallback 不是函数说明 currentTask 已经完成了,需要将它从队列中 移除((currentTask === peek(taskQueue)) -> pop(taskQueue))

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
function workLoop(hasTimeRemaining, initialTime) {
  let currentTime = initialTime
  advanceTimers(currentTime)
  // 取出队列中第一个任务 taskQueue[0]
  currentTask = peek(taskQueue)
  while (currentTask !== null/*省略debug的条件*/) {
    if (currentTask.expirationTime > currentTime && (
      !hasTimeRemaining || shouldYieldToHost()
    )) {
      // 任务还没过期且没有多余的时间去执行它了,所以要退出等下次有充足的时间再说
      break
    }

    // 时间充足
    const callback = currentTask.callback
    if (typeof callback === 'function') {
      currentTask.callback = null
      currentPriorityLevel = currentTask.priorityLevel
      // 已经过期了
      const didUserCallbackTimeout = currentTask.expirationTime <= currentTime
      // 执行任务函数
      const continuationCallback = callback(didUserCallbackTimeout)
      // 重新取一次时间, callback 调用可能比较耗时
      currentTime = getCurrentTime()
      if (typeof continuationCallback === 'function') {
        // 如果任务函数本身返回了一个函数,当作下一个任务处理,即 callback 返回的
        // 函数会在它执行退出之后立即被执行
        currentTask.callback = continuationCallback
      } else {
        if (currentTask === peek(taskQueue)) {
          // 执行完之后丢掉
          pop(taskQueue)
        }
      }
      advanceTimers(currentTime)
    } else {
      // 不是函数丢弃掉,pop 就是取第一个出来,然后最后一个放到 heap[0]
      // 进行 siftDown(heap, node, 0)
      pop(taskQueue)
    }
    // 取下一个
    currentTask = peek(taskQueue)
  }
  // 不管有没任务都退出
  if (currentTask !== null) {
    return true
  } else {
    // 到这里说明 taskQueue 清空了,该到 timerQueue 中的任务了
    const firstTimer = peek(timerQueue)
    if (firstTime !== null) {
      requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime)
    }
    return false
  }
}

workLoop while 中关键点:

  1. 必须是已经过期了的任务且当前要有足够的空闲时间才会去执行当前的任务 currentTask,否则直接退出 while

  2. callback() 的返回值是不是一个函数,如果是会在当前 while->currentTask 中一次都 执行完之后 currentTask 才算结束

  3. 结束后用 pop(taskQueue) 移除 currentTask,继续下一个任务

  4. while 退出后,即使当前任务还在也要重新走一遍管道机制,即 workLoop 直接返回 true, 会导致 hasMoreWork=true 从而重新调用 schedulePerformWorkUntilDeadline() 向 PORT1 发信号重新走 performWorkUntilDeadline() -> flushWork() -> workLoop() 流程。

  5. 当 taskQueue 中已经没有任务了的时候,此时就该启动 timerQueue 中的任务执行了, 调用 requestHostTimeout() 其实就是 setTimeout, 返回 false 表示一个 taskQueue 处理阶段完成了。

QUESTION

❓❓❓ 这个 hasTimeRemaining 依据是什么,空闲时间又是多久?

advanceTimers(currentTime)

这个函数是用来检查 timerQueue 里面的任务有没有到了时间的,能到这个队列来说明入列 时它的 startTime > currentTime,到执行的时候 currentTime 已经更新了,此时 timerQueue 里面的任务肯定有些已经过期了,此时过期了的就需要放到 taskQueue 中去在 wookLoop 中有空隙的时间去立即执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
function advanceTimers(currentTime) {
  // 检查 timerQueue 中是不是有已经过期了的任务,将它们加入到 taskQueue 中
  // 去优先执行
  let timer = peek(timerQueue)
  while (timer !== null) {
    if (timer.callback === null) {
      // Timer was cancelled
      pop(timerQueue)
    } else if (timer.startTime <= currentTime) {
      // 时间到了,将它加入到 taskQueue
      pop(timerQueue)
      timer.sortIndex = timer.expirationTime
      push(taskQueue, timer)
    } else {
      // 还没过期,依旧等待
      return
    }
    timer = peek(timerQueue)
  }
}

小结

到这里一个基本完整的 Scheduler 就已经完成了,下面是整个过程的简要流程图( 完整图)

/img/react/scheduler-brief.svg

这里最主要的关键点在于 通过管道衔接了任务启动(requestHostCallback)和执行 (flushWork), 然后在 flushWork->workLoop 过程中通过空余时间决定任务是不是应该立即 执行,还是等到下次空隙去执行,且通过 startTime 和 expirationTime 来控制任务执行 的先后顺序,用两个队列来承载了两种不同类型的任务(taskQueue代表已经过期的任务, timerQueue 代表未过期的任务)。

taskQueue 在 workLoop 中通过 while 不断的在当前空隙时间内去 flush 掉,只有当 当前 taskQueue 中的所有任务都完成了之后,再去重启 timerQueue 的计时器延迟方式去 触发任务执行。

这里使用了 performance.now() 来取当前的时间戳,因为需要亚毫秒级的时间,相关的知 识点 Performance

shouldYieldToHost()

用来检测是不是应该暂停 taskQueue 的 flush 工作,这里有几个条件可以阻止 taskQueue 。

  1. 开启了 input pending 的话,这个时间不仅和 currentTime>=deadline 有关,还和 input pending 和 needsPaint 有关

    needsPaint 是在 requestPaint() 中才会被设置成 true 的全局变量,在 performWorkUntilDeadline 执行完成之后会被设置成 false

    总的下来应该暂停的条件是: 已经过了 deadline 且 needsPaint 或 有 input pending。

    如果两者都没有,很有可能此时还是有空隙时间,这个时间需要 用 currentTime>=maxYieldInterval(默认:300ms) 来看看是不是应该暂停,如果没超过 最大的暂停间隙,还是可以继续 flush task 的。

  2. 没有开启时,只需要检测当前时间是不是过了 deadline

deadline 是当 scheduleCallback 时有条件进入 requestHostCallback 去触发管理一端 PORT2 schedulePerformWorkUntilDeadline 向另一端 PORT1 performWorkUntilDeadline 发送信号,去开启 flushWork -> workLoop flush taskQueue 的时候,在 PORT1 执行端设 置的一个截止时间(deadline = currentTime + yieldInterval)

yieldInterval 默认是 5ms 的时间,也就是说在这时间内如果 taskQueue 队列还没有 flush 完就得暂停了,因为不能阻碍主线程的工作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
function shouldYieldToHost() {
  if (enableIsInputPending &&
    navigator?.scheduling?.isInputPending !== undefined) {
    const scheduling = navigator.scheduling
    const currentTime = getCurrentTime()
    if (currentTime >= deadline) {
      // 没时间了。我们可能想暂停对主要线程的控制,以便浏览器可以执行高优先级任务。
      // 主要的是渲染和用户输入。如果有悬而未决的渲染或悬而未决的输入,我们就应该暂停。
      // 但如果两者都没有,那么我们可以在保持响应性的同时减少暂停。不管怎样我们最终都
      // 需要暂停,因为可能有一个悬而未决的渲染不是伴随着对“requestPaint”或其他
      // 主线程任务的调用比如网络事件。
      if (needsPaint || scheduling.isInputPending()) {
        // 有一个 pending 的渲染或用户输入,应该暂停等待完成
        return true
      }

      // 没有 pending 输入,仅仅暂停 maxYieldInterval 时长
      return currentTime >= maxYieldInterval
    } else {
      // 在当前帧还有多余的时间,就不该暂停
      return false
    }
  } else {
    // isInputPending = false.
    // 因为没有什么其它的方式可以知道是不是有 pending input,
    // 所以这里要保证在 frame 的最后总是要暂停一下
    return getCurrentTime() >= deadline
  }
}

所以,这个函数检测的关键条件是 needsPaint 和 isInputPending,前置条件是有没超过 截止时间或最大间隙(maxYieldInterval:300ms)

handleTimeout(currentTime)

这个函数是和 requestHostTimeout 以及 timerQueue 应该滞后的任务有关的处理函数,在 调用 requestHostTimeout(handleTimeout, ms) 时间做为第一个参数传递的,所以要了解 下它是怎么处理 timerQueue 中的任务的(callback)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
function handleTimeout(currentTime) {
  isHostTimeoutScheduled = false
  advanceTimers(currentTime)

  // host callback 优先级更高,如果它还没完,这里就不该启动 host timeout
  if (!isHostCallbackScheduled) {
    if (peek(taskQueue) !== null) {
      // 在做之前先看下老大还有没其它指示,有的话就先做老大的任务
      isHostCallbackScheduled = true
      requestHostCallback(flushWork)
    } else {
      // taskQueue 清空了,才轮到 timerQueue
      const firstTimer = peek(timerQueue)
      if (firstTimer !== null) {
        requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime)
      }
    }
  }
}

/img/react/scheduler-request-timeout.svg

从图中可知,当 requestHostTimeout(handleTimeout, ms) 一旦开始执行之后,就会一直 在用计时器不断的在 handleTimeout(currentTime)requestHostTimeout 之间来回递归, 中间如果 taskQueue 不空,则中断去 flush taskQueue,否则一直刷完 timerQueue 为止。

INFO

其实到这里,Scheduler 已经基本完成了,从 scheduleCallback 到对 taskQueuetimerQueue 的入列,再到检测启动。然后使用 MessageChannel 管道机制实现任务启动 和执行,再到具体的 flushWork 触发 workLoop 去循环 flush taskQueue 中已经过期应该 且有执行时机(一帧的空闲时间)的任务,最后不断的重复管道机制 flushWork -> workLoop -> port1-2 -> flushWork -> … 在当前时间片内去 flush 掉所有能处理的任务。这其中 有一个很重要的关键点就是所有的任务都必须走管道机制去执行 callback (只此一条路), 虽然 timerQueque 走的是 requestHostTimeout 路线,但是最终会在 handleTimeout 中被 过滤出来到 taskQueue 中以 taskQueue 方式去完成,使用 requestHostTimeout 延迟方式 无非是用 setTimeout 中去异步执行罢了。

测试

测试基于官方的用例: scheduler/src/__tests__/Scheduler-test.js

结合浏览器测试(日志请从下往上看):

测试代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Vue.createApp({
  template: `
<div class="mark" style="max-height:500px;overflow:scroll;margin-bottom:10px">
  <p v-for="log in logs" v-html="log"/>
</div>`,
  setup() {
    log.info(`start --------->`)

    Vue.onMounted(() => {
      // 在 deadline 之前就结束的任务
      test(`task that finishes before deadline`, NormalPriority, () => {
        log.event('Task1')
      })
      test('task with continuation', NormalPriority, () => {
        log.event('Task2')
        let i = 0
        while (shouldYield()) {
          log.event(`${i}: should yield ?`)
          if (++i >= 4) break
        }
        log.info(`Yield at ${performance.now()}ms`)
        return () => log.event('Continuation')
      })

    })

    return {
      log, logs
    }
  }
}).use(ElementPlus).mount('#xGUn4jG')

function test(mark, priority, callback) {
  log.se(`>>>>>>>>> start: ${mark}`)
  scheduleCallback(priority, () => {
    var continuation = callback()
    log.se(`<<<<<<<<< end: ${mark}`)
    return continuation
  })
}

结果分析(嫌罗嗦直接看上面流程图):

Stage1:

>>>>>>>>> start: task that finishes before deadline 对应第一个 test()

Stage2:[scheduleCallback]

3719.9000000059605: {"isMessageLoopRunning":false}

[requestHostCallback]

3719.300000011921: {"isHostCallbackScheduled":false,"isPerformingWork":false}

[taskQueue pushed]

3717.0999999940395: {"id":1,"priorityLevel":3,"startTime":3690.300000011921,"expirationTime":8690.300000011921,"sortIndex":-1}

3707.5: {"priorityLevel":3,"currentTime":3690.300000011921,"startTime":3690.300000011921}

[scheduleCallback]

执行 scheduleCallback -> 因为 "startTime":3690.300000011921<now:3717.0999999940395 最终加入到 taskQueue 然后执 行 requestHostCallback()

Stage3: requestHostCallback -> schedulePerformWorkUntilDeadline 最后执行 port.postMessage(null) 给 PORT1 发消息,此时 requestHostCallback 已经完成了,但 是 PORT1 接受消息的过程并不影响 requestHostCallback() 后面的代码执行(也就是说同 步代码优先)

3623.4000000059605: {"yieldInterval":5,"deadline":0,"isMessageLoopRunning":true}

[performWorkUntilDeadline:flushWork]

3332.5999999940395: {"isHostCallbackScheduled":true,"isPerformingWork":false}

[taskQueue pushed]

3332.4000000059605: {"id":2,"priorityLevel":3,"startTime":3332.199999988079,"expirationTime":8332.199999988079,"sortIndex":-1}

3332.2999999821186: {"priorityLevel":3,"currentTime":3332.199999988079,"startTime":3332.199999988079}

[scheduleCallback]

>>>>>>>>> start: task with continuation

3331.9000000059605: {"message":"PORT2: send message `null` -> PORT1"}

[schedulePerformWorkUntilDeadline]

3331.699999988079: {"isMessageLoopRunning":false}

[requestHostCallback]

Stage4: 继续 task1 的 流程,PORT1 收到消息执行 performWorkUntilDeadline 一直到 workLoop 结束

3630.0999999940395: {"hasMoreWork":true}

[flushWork]

[workLoop]

3629.5: {"shouldYield":true}

[workLoop]

3629.2999999821186: {"currentTime":3629.2999999821186,"deadline":3628.5}

3629.0999999940395: {"enableIsInputPending":false,"maxYieldInterval":300,"deadline":3628.5}

[shouldYieldToHost]

3628.699999988079: {"id":1,"priorityLevel":3,"startTime":3329.2999999821186,"expirationTime":8329.299999982119,"sortIndex":8329.299999982119}

3628.699999988079: {"currentTime":3623.5,"hasTimeRemaining":true,"initialTime":3623.5}

[workLoop]

3628.199999988079: {"currentTime":3623.5}

[advanceTimers]

3623.699999988079: {"hasTimeRemaining":true,"initialTime":3623.5,"isHostTimeoutScheduled":false}

[flushWork]

3623.4000000059605: {"yieldInterval":5,"deadline":0,"isMessageLoopRunning":true}

[performWorkUntilDeadline:flushWork]

注意看上面的第一行,表示 taskQueue 中还有任务,其实就是第二个 test() 的 task2,因 为入列的时候 task1 正在处理,所以它只是单纯的做了入列并没有启动。

Stage5: 开始轮到 task2 执行了,直到下面 task2 callback 执行

[scheduler:pop]

<<<<<<<<< end: task that finishes before deadline

7257.9000000059605: task callback called: Task1

...

Stage6: 注意看最后的输出,经过 Stage5 之后并没结束,因为 test2 中的 callback 返 回值是一个函数 return () => log.event('Continuation') 这个函数会在下一次 while 循环中被处理,之后才结束。

对应的函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
function requestHostCallback(callback/*flushWork*/) {
  scheduledHostCallback = callback
  window.__log('requestHostCallback', { isMessageLoopRunning })
  if (!isMessageLoopRunning) {
    isMessageLoopRunning = true
    schedulePerformWorkUntilDeadline()
  }
}

let schedulePerformWorkUntilDeadline = (() => {
  const channel = new MessageChannel()
  const port = channel.port2
  channel.port1.onmessage = performWorkUntilDeadline
  return () => {
    window.__log('schedulePerformWorkUntilDeadline', {
      message: 'PORT2: send message `null` -> PORT1'
    })
    port.postMessage(null)
  }
})()

即 task2 scheduleCallback -> taskQueue pushed -> 此时检测到 "isMessageLoopRunning":true 不会直接触发 requestHostCallback() 而是让 task1 的流程继续:PORT1 接受到消息执行 performWorkUntilDeadline


[workLoop]

3629.5: {"shouldYield":true}

[workLoop]

3629.2999999821186: {"currentTime":3629.2999999821186,"deadline":3628.5}

3629.0999999940395: {"enableIsInputPending":false,"maxYieldInterval":300,"deadline":3628.5}

[shouldYieldToHost]

3628.699999988079: {"id":1,"priorityLevel":3,"startTime":3329.2999999821186,"expirationTime":8329.299999982119,"sortIndex":8329.299999982119}

3628.699999988079: {"currentTime":3623.5,"hasTimeRemaining":true,"initialTime":3623.5}

[workLoop]

3628.199999988079: {"currentTime":3623.5}

[advanceTimers]

3623.699999988079: {"hasTimeRemaining":true,"initialTime":3623.5,"isHostTimeoutScheduled":false}

[flushWork]

3623.4000000059605: {"yieldInterval":5,"deadline":0,"isMessageLoopRunning":true}

[performWorkUntilDeadline:flushWork]

MessageChannel 执行先后问题测试:

>>>>>>>>> start: task with continuation 输出发生在 schedulePerformWorkUntilDeadline 之后,并不是直接调用 performWorkUntilDeadline 因 为 MessageChannel 发送消息时并不会阻碍后面同步代码的执行,如下点击“发送”按钮可测 试日志输出顺序。

测试代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
(function() {
Vue.createApp({
template: `
<el-button @click="start" plain type="primary">发送</el-button>
<div class="mark">
<p v-for="log in logs" v-html="log"/>
</div>
`,
setup() {
    const logs = Vue.reactive(['--- begin ---'])

    const channel = new MessageChannel()
    const i = Vue.ref(0)
    channel.port2.onmessage = () => logs.push(`${i.value}: message from PORT1...`)

    return {
    logs,
    start() {
        const val = ++i.value
        channel.port1.postMessage(`${val}: message to PORT2`)
        logs.push(`${val}: should this log before channel ?`)
        setTimeout(() => logs.push(`${val}: should timeout before channel ?`))
    }
    }
}

}).use(ElementPlus).mount('#xqoc5YN')
}())

结论:

同步代码 > channel receiver > setTimeout

结语

完整的 scheduler.js

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
const NoPriority = 0;
const ImmediatePriority = 1;
const UserBlockingPriority = 2;
const NormalPriority = 3;
const LowPriority = 4;
const IdlePriority = 5;

window.__log = window.__log || function() { }

// Node Heap //////////////////////////////////////////////////////////////////
function push(heap, node) {
  const index = heap.length
  heap.push(node)
  siftUp(heap, node, index)
}

function peek(heap) {
  return heap.length === 0 ? null : heap[0]
}

function pop(heap) {
  if (heap.length === 0) {
    return null
  }

  const first = heap[0]
  const last = heap.pop()
  window.__log('scheduler:pop', first)
  if (last !== first) {
    heap[0] = last
    siftDown(heap, last, 0)
  }
  return first
}


function siftUp(heap, node, i) {
  let index = i;
  while (index > 0) {
    const parentIndex = (index - 1) >>> 1
    const parent = heap[parentIndex]
    if (compare(parent, node) > 0) {
      // 找到比 node.id/sortIndex 更大的节点,然后交换
      heap[parentIndex] = node
      heap[index] = parent
      index = parentIndex
    } else {
      // 排序完成,没有更大的了
      return
    }
  }
}

function siftDown(heap, node, i) {
  let index = i
  const length = heap.length
  const halfLength = length >>> 1
  while (index < halfLength) {
    const leftIndex = (index + 1) * 2 - 1
    const left = heap[leftIndex]
    const rightIndex = leftIndex + 1
    const right = heap[rightIndex]

    if (compare(left, node) < 0) {
      if (rightIndex < length && compare(right, left) < 0) {
        heap[index] = right
        heap[rightIndex] = node
        index = rightIndex
      } else {
        heap[index] = left
        heap[leftIndex] = node
        index = leftIndex
      }
    } else if (rightIndex < length && compare(right, node) < 0) {
      heap[index] = right;
      heap[rightIndex] = node;
      index = rightIndex;

    } else {
      // Neither child is smaller. Exit.
      return
    }
  }
}

function compare(a, b) {
  // 先比较 sort index 然后比较 task id
  const diff = a.sortIndex - b.sortIndex
  return diff !== 0 ? diff : a.id - b.id
}

// Scheduler //////////////////////////////////////////////////////////////////
// 浏览器环境的 performance 对象, 省略其它判断...
let getCurrentTime = () => performance.now()
// Incrementing id counter. Used to maintain insertion order.
var taskIdCounter = 1;
// Tasks are stored on a min heap
var taskQueue = [];
var timerQueue = [];

// Pausing the scheduler is useful for debugging.
var isSchedulerPaused = false;

// 当前正在执行的任务及其优先级
var currentTask = null;
var currentPriorityLevel = NormalPriority;


// This is set while performing work, to prevent re-entrancy.
var isPerformingWork = false;

// 已经过期的任务是不是正在被执行
var isHostCallbackScheduled = false;
// 还没过期的任务是不是正在被执行
var isHostTimeoutScheduled = false;

function advanceTimers(currentTime) {
  window.__log('advanceTimers', { currentTime })
  // 检查 timerQueue 中是不是有已经过期了的任务,将它们加入到 taskQueue 中
  // 去优先执行
  let timer = peek(timerQueue)
  while (timer !== null) {
    if (timer.callback === null) {
      // Timer was cancelled
      pop(timerQueue)
    } else if (timer.startTime <= currentTime) {
      window.__log({ title: 'timer 过期,进入 taskQueue', ...timer })
      // 时间到了,将它加入到 taskQueue
      pop(timerQueue)
      timer.sortIndex = timer.expirationTime
      push(taskQueue, timer)
    } else {
      // 还没过期,依旧等待
      return
    }
    timer = peek(timerQueue)
  }
}

function handleTimeout(currentTime) {
  isHostTimeoutScheduled = false
  advanceTimers(currentTime)

  window.__log('handleTimeout', { currentTime })
  // host callback 优先级更高,如果它还没完,这里就不该启动 host timeout
  if (!isHostCallbackScheduled) {
    if (peek(taskQueue) !== null) {
      // 在做之前先看下老大还有没其它指示,有的话就先做老大的任务
      isHostCallbackScheduled = true
      window.__log('flush taskQueue')
      requestHostCallback(flushWork)
    } else {
      // taskQueue 清空了,才轮到 timerQueue
      const firstTimer = peek(timerQueue)
      if (firstTimer !== null) {
        window.__log('flush timerQueue')
        requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime)
      }
    }
  }
}

function flushWork(hasTimeRemaining, initialTime) {
  window.__log('flushWork', { hasTimeRemaining, initialTime, isHostTimeoutScheduled })

  isHostCallbackScheduled = false
  if (isHostTimeoutScheduled) {
    // 如果此时有一个未来时间的任务存在计时中,要取消它,先执行 host callback
    isHostTimeoutScheduled = false
    cancelHostTimeout()
  }

  isPerformingWork = true
  const previousPriorityLevel = currentPriorityLevel
  try {
    return workLoop(hasTimeRemaining, initialTime)
  } finally {
    // 清理工作
    window.__log('flushWork', 'finally clean work')
    currentTask = null
    currentPriorityLevel = previousPriorityLevel
    isPerformingWork = false
  }
}

function workLoop(hasTimeRemaining, initialTime) {
  let currentTime = initialTime
  advanceTimers(currentTime)
  // 取出队列中第一个任务 taskQueue[0]
  currentTask = peek(taskQueue)
  window.__log('workLoop', { currentTime, hasTimeRemaining, initialTime }, currentTask)

  while (currentTask !== null && !((enableSchedulerDebugging && isSchedulerPaused))/*省略debug的条件*/) {
    const shouldYield = shouldYieldToHost() // for log
    window.__log('workLoop', { shouldYield })
    if (currentTask.expirationTime > currentTime && (
      !hasTimeRemaining || shouldYield /*shouldYieldToHost()*/
    )) {
      // 任务还没过期且没有多余的时间去执行它了,所以要退出等下次有充足的时间再说
      break
    }

    // 时间充足
    const callback = currentTask.callback
    if (typeof callback === 'function') {
      window.__log('workLoop', `callback: ${callback} || anonymous`)
      currentTask.callback = null
      currentPriorityLevel = currentTask.priorityLevel
      // 已经过期了
      const didUserCallbackTimeout = currentTask.expirationTime <= currentTime
      // 执行任务函数
      const continuationCallback = callback(didUserCallbackTimeout)
      // 重新取一次时间, callback 调用可能比较耗时
      currentTime = getCurrentTime()
      if (typeof continuationCallback === 'function') {
        // 如果任务函数本身返回了一个函数,当作下一个任务处理,即 callback 返回的
        // 函数会在它执行退出之后立即被执行
        currentTask.callback = continuationCallback
      } else {
        if (currentTask === peek(taskQueue)) {
          // 执行完之后丢掉
          pop(taskQueue)
        }
      }
      advanceTimers(currentTime)
    } else {
      // 不是函数丢弃掉,pop 就是取第一个出来,然后最后一个放到 heap[0]
      // 进行 siftDown(heap, node, 0)
      pop(taskQueue)
    }
    // 取下一个
    currentTask = peek(taskQueue)
  }

  window.__log('workLoop', 'exit while...')

  // 不管有没任务都退出
  if (currentTask !== null) {
    return true
  } else {
    // 到这里说明 taskQueue 清空了,该到 timerQueue 中的任务了
    const firstTimer = peek(timerQueue)
    window.__log('workLoop', { t: 'first timeQueue task', firstTimer })
    if (firstTimer !== null) {
      requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime)
    }
    return false
  }
}

function scheduleCallback(priorityLevel, callback, options) {
  var currentTime = getCurrentTime()

  var startTime // 任务执行的开始时间
  if (typeof options === 'object' && options !== null) {
    var delay = options.delay
    if (typeof delay === 'number' && delay > 0) {
      startTime = currentTime + delay
    } else {
      startTime = currentTime
    }
  } else {
    startTime = currentTime
  }

  var timeout // 根据优化级设置超时时间
  switch (priorityLevel) {
    case ImmediatePriority:
      timeout = -1
      break
    case UserBlockingPriority:
      timeout = 250
      break
    case IdlePriority:
      // Max 31 bit integer. The max integer size in V8 for 32-bit systems.
      // Math.pow(2, 30) - 1
      // 0b111111111111111111111111111111
      timeout = 1073741823
      break
    case LowPriority:
      timeout = 10000
      break
    case NormalPriority:
      timeout = 5000
      break
  }

  window.__log('scheduleCallback', { priorityLevel, currentTime, startTime })

  // 过期时间
  var expirationTime = startTime + timeout

  // 封装新任务
  var newTask = {
    id: taskIdCounter++,
    callback,
    priorityLevel,
    startTime,
    expirationTime,
    sortIndex: -1
  }

  window.__log(newTask)
  if (startTime > currentTime) {
    // 延迟的任务,应该进入队列排队,用肇始时间做索引
    newTask.sortIndex = startTime
    push(timerQueue, newTask)
    // peek 取队列中第一个任务 queue[0]
    if (peek(taskQueue) === null && newTask === peek(timerQueue)) {
      // queue: [null, newTask] 情况
      // 所有的任务还在排队中,且当前的 newTask 就是最早过期的那个
      if (isHostTimeoutScheduled) {
        cancelHostTimeout()
      } else {
        isHostTimeoutScheduled = true
      }
      requestHostTimeout(handleTimeout, startTime - currentTime)
    }
  } else {
    newTask.sortIndex = expirationTime
    push(taskQueue, newTask)
    // Schedule a host callback, if needed. If we're already performing work,
    // wait until the next time we yield.
    window.__log('taskQueue pushed', { isHostCallbackScheduled, isPerformingWork })
    if (!isHostCallbackScheduled && !isPerformingWork) {
      isHostCallbackScheduled = true
      requestHostCallback(flushWork)
    }
  }

  return newTask
}

let isMessageLoopRunning = false;
let taskTimeoutID = -1;
// 当前正在 flush 的任务流
let scheduledHostCallback = null

// scheduler 会周期性的暂停以防主线程上有正在执行的其它工作,例如:用户事件等
// 默认情况下,每一帧会暂停多次。它并不会试图去结合帧边界,因为大多数的工作并不
// 需要这么做,如果有必要的会用 requestAnimationFrame
let yieldInterval = 5;
let deadline = 0;

// TODO: Make this configurable
// TODO: Adjust this based on priority?
const maxYieldInterval = 300;
let needsPaint = false;

// flags
let enableIsInputPending = false
let enableSchedulerDebugging = false

function shouldYieldToHost() {
  window.__log('shouldYieldToHost', {
    enableIsInputPending, maxYieldInterval, deadline
  })

  if (enableIsInputPending &&
    navigator?.scheduling?.isInputPending !== undefined) {
    const scheduling = navigator.scheduling
    const currentTime = getCurrentTime()
    if (currentTime >= deadline) {
      window.__log({ currentTime, deadline, needsPaint })
      window.__log(scheduling)
      // 没时间了。我们可能想暂停对主要线程的控制,以便浏览器可以执行高优先级任务。
      // 主要的是渲染和用户输入。如果有悬而未决的渲染或悬而未决的输入,我们就应该暂停。
      // 但如果两者都没有,那么我们可以在保持响应性的同时减少暂停。不管怎样我们最终都
      // 需要暂停,因为可能有一个悬而未决的渲染不是伴随着对“requestPaint”或其他
      // 主线程任务的调用比如网络事件。
      if (needsPaint || scheduling.isInputPending()) {
        // 有一个 pending 的渲染或用户输入,应该暂停等待完成
        return true
      }

      // 没有 pending 输入,仅仅暂停 maxYieldInterval 时长
      return currentTime >= maxYieldInterval
    } else {
      // 在当前帧还有多余的时间,就不该暂停
      return false
    }
  } else {
    const currentTime = getCurrentTime()
    window.__log({ currentTime, deadline })
    // isInputPending = false.
    // 因为没有什么其它的方式可以知道是不是有 pending input,
    // 所以这里要保证在 frame 的最后总是要暂停一下
    return currentTime >= deadline
  }
}

const performWorkUntilDeadline = () => {
  window.__log('performWorkUntilDeadline:flushWork', {
    yieldInterval, deadline, isMessageLoopRunning
  })
  if (scheduledHostCallback !== null) {
    const currentTime = getCurrentTime()
    deadline = currentTime + yieldInterval
    const hasTimeRemaining = true

    // 如果 scheduler task 异常,退出当前的浏览器 task 以致 error 可以被观测到
    //
    // 注意不要使用 try...catch,而是要让程序继续执行下去
    let hasMoreWork = true
    try {
      hasMoreWork = scheduledHostCallback(hasTimeRemaining, currentTime)
      window.__log({ hasMoreWork })
    } finally {
      if (hasMoreWork) {
        // 无论如何都要执行,看是不是有更多的任务待处理
        schedulePerformWorkUntilDeadline()
      } else {
        // 完成了一轮
        isMessageLoopRunning = false
        // 准备接受下一个 flushWork
        scheduledHostCallback = null
      }
    }
  } else {
    // 标记当前空闲
    isMessageLoopRunning = false
  }

  // 暂停,会使浏览器有机会去渲染,所以要重围
  needsPaint = false
}
// 省略环境的检查,直接使用 DOM 和 Worker 环境,注释中说更
// 偏向用 MessageChannel 是因为 setTimeout 4ms 的问题
// 原本的检查优化级: setImmediate > MessageChannel > setTimeout
let schedulePerformWorkUntilDeadline = (() => {
  const channel = new MessageChannel()
  const port = channel.port2
  channel.port1.onmessage = performWorkUntilDeadline
  return () => {
    window.__log('schedulePerformWorkUntilDeadline', {
      message: 'PORT2: send message `null` -> PORT1'
    })
    port.postMessage(null)
  }
})()

function requestHostCallback(callback/*flushWork*/) {
  scheduledHostCallback = callback
  window.__log('requestHostCallback', { isMessageLoopRunning })
  if (!isMessageLoopRunning) {
    isMessageLoopRunning = true
    schedulePerformWorkUntilDeadline()
  }
}

function cancelHostTimeout() {
  window.__log('cancelHostTimeout')
  clearTimeout(taskTimeoutID)
  taskTimeoutID = -1
}

function requestHostTimeout(callback, ms) {
  taskTimeoutID = setTimeout(() => {
    callback(getCurrentTime())
  }, ms)
}


// 其它函数
function runWithPriority(priorityLevel, eventHandler) {
  switch (priorityLevel) {
    case ImmediatePriority:
    case UserBlockingPriority:
    case NormalPriority:
    case LowPriority:
    case IdlePriority:
      break;
    default:
      priorityLevel = NormalPriority;
  }

  var previousPriorityLevel = currentPriorityLevel;
  currentPriorityLevel = priorityLevel;

  try {
    return eventHandler();
  } finally {
    currentPriorityLevel = previousPriorityLevel;
  }
}

function next(eventHandler) {
  var priorityLevel;
  switch (currentPriorityLevel) {
    case ImmediatePriority:
    case UserBlockingPriority:
    case NormalPriority:
      // Shift down to normal priority
      priorityLevel = NormalPriority;
      break;
    default:
      // Anything lower than normal priority should remain at the current level.
      priorityLevel = currentPriorityLevel;
      break;
  }

  var previousPriorityLevel = currentPriorityLevel;
  currentPriorityLevel = priorityLevel;

  try {
    return eventHandler();
  } finally {
    currentPriorityLevel = previousPriorityLevel;
  }
}

function wrapCallback(callback) {
  var parentPriorityLevel = currentPriorityLevel;
  return function() {
    // This is a fork of runWithPriority, inlined for performance.
    var previousPriorityLevel = currentPriorityLevel;
    currentPriorityLevel = parentPriorityLevel;

    try {
      return callback.apply(this, arguments);
    } finally {
      currentPriorityLevel = previousPriorityLevel;
    }
  };
}

function continueExecution() {
  isSchedulerPaused = false;
  if (!isHostCallbackScheduled && !isPerformingWork) {
    isHostCallbackScheduled = true;
    requestHostCallback(flushWork);
  }
}


function pauseExecution() {
  isSchedulerPaused = true;
}

function getCurrentPriorityLevel() {
  return currentPriorityLevel;
}

function getFirstCallbackNode() {
  return peek(taskQueue);
}



// SchedulerPostTask.js ///////////////////////////////////////////////////////
function shouldYield() {
  const current = getCurrentTime()
  window.__log('shouldYield', { current, deadline })
  return current >= deadline
}

重要函数和流程(回顾)

task 队列管理:

  1. siftUp(heap, node, i)

  2. siftDown(heap, node, i)

  3. push(heap, node) -> siftUp

  4. pop(heap) -> siftDown

  5. compare(a,b), 根据 sortIndex 和 id 比较大小

全局变量:

  1. taskIdCounter=1

  2. taskQueue=[]

  3. timerQueue=[]

  4. isSchedulerPaused=false

  5. currentTask=null

  6. currentPriorityLevel=NormalPriority

  7. isPerformingWork=false

  8. isHostCallbackScheduled=false

  9. isHostTimeoutScheduled=false

  10. isMessageLoopRunning=false

  11. taskTimeoutID=-1

  12. scheduledHostCallback=null

  13. yieldInterval=5

  14. deadline=0

  15. maxYieldInterval=300

  16. needsPaint=false

  17. enableIsInputPending=false

  18. enableSchedulerDebugging=false

函数:

  1. getCurrentTime = () => performance.now()

  2. advanceTimers(currentTime)

  3. handleTimeout(currentTime)

  4. flushWork(hasTimeRemaining, initialTime)

  5. workLoop(hasTimeRemaining, initialTime)

  6. scheduleCallback(priorityLevel, callback, options)

  7. shouldYieldToHost()

  8. performWorkUntilDeadline()

  9. schedulePerformWorkUntilDeadline()

  10. requestHostCallback(callback/*flushWork*/)

  11. cancelHostTimeout()

  12. requestHostTimeout(callback, ms)

  13. runWithPriority(priorityLevel, eventHandler)

函数调用过程(详情:小结):

/img/react/scheduler-request-timeout.svg