-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathBlockingStackImpl.kt
More file actions
97 lines (76 loc) · 2.68 KB
/
BlockingStackImpl.kt
File metadata and controls
97 lines (76 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import java.util.concurrent.atomic.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.suspendCoroutine
import kotlin.coroutines.resume
class BlockingStackImpl<E> : BlockingStack<E> {
// ==========================
// Segment Queue Synchronizer
// ==========================
private val enqIdx: AtomicReference<Resumer<E>>
private val deqIdx: AtomicReference<Resumer<E>>
init {
val dummy = Resumer<E>(null)
enqIdx = AtomicReference<Resumer<E>>(dummy)
deqIdx = AtomicReference<Resumer<E>>(dummy)
}
private class Resumer<E>(
val action: Continuation<E>?,
val next: AtomicReference<Resumer<E>> = AtomicReference<Resumer<E>>()
)
private suspend fun suspend(): E = suspendCoroutine {
while (true) {
val oldTail = deqIdx.get()
val newTail = Resumer<E>(it)
if (oldTail.next.compareAndSet(null, newTail)) {
deqIdx.compareAndSet(oldTail, newTail)
break
}
}
}
private fun resume(element: E) {
while (true) {
val oldHead = enqIdx.get()
val newHead = oldHead.next.get()
if (newHead != null && oldHead != deqIdx.get() && enqIdx.compareAndSet(oldHead, newHead)) {
newHead.action?.resume(element)
return
}
}
}
// ==============
// Blocking Stack
// ==============
private val head = AtomicReference<Node<E>?>()
private val elements = AtomicInteger()
override fun push(element: E) {
if (elements.getAndIncrement() < 0) {
resume(element)
return
}
while (true) {
val oldHead = head.get()
if (oldHead?.element === SUSPENDED) {
val newHead = oldHead.next.get()
if (head.compareAndSet(oldHead, newHead)) {
resume(element)
return
}
continue
}
if (head.compareAndSet(oldHead, Node(element, AtomicReference<Node<E>?>(oldHead)))) break
}
}
override suspend fun pop(): E {
if (elements.getAndDecrement() <= 0) return suspend()
while (true) {
val oldHead = head.get()
if (oldHead == null) {
if (head.compareAndSet(oldHead, Node<E>(SUSPENDED))) return suspend()
continue
}
if (head.compareAndSet(oldHead, oldHead.next.get())) return oldHead.element as E
}
}
}
private class Node<E>(val element: Any?, val next: AtomicReference<Node<E>?> = AtomicReference<Node<E>?>())
private val SUSPENDED = Any()