-
Notifications
You must be signed in to change notification settings - Fork 418
/
ruby_thread_local_var.rb
172 lines (152 loc) · 5.03 KB
/
ruby_thread_local_var.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
require 'thread'
require 'concurrent/atomic/abstract_thread_local_var'
module Concurrent
# @!visibility private
# @!macro internal_implementation_note
class RubyThreadLocalVar < AbstractThreadLocalVar
# Each thread has a (lazily initialized) array of thread-local variable values
# Each time a new thread-local var is created, we allocate an "index" for it
# For example, if the allocated index is 1, that means slot #1 in EVERY
# thread's thread-local array will be used for the value of that TLV
#
# The good thing about using a per-THREAD structure to hold values, rather
# than a per-TLV structure, is that no synchronization is needed when
# reading and writing those values (since the structure is only ever
# accessed by a single thread)
#
# Of course, when a TLV is GC'd, 1) we need to recover its index for use
# by other new TLVs (otherwise the thread-local arrays could get bigger
# and bigger with time), and 2) we need to null out all the references
# held in the now-unused slots (both to avoid blocking GC of those objects,
# and also to prevent "stale" values from being passed on to a new TLV
# when the index is reused)
# Because we need to null out freed slots, we need to keep references to
# ALL the thread-local arrays -- ARRAYS is for that
# But when a Thread is GC'd, we need to drop the reference to its thread-local
# array, so we don't leak memory
# @!visibility private
FREE = []
LOCK = Mutex.new
ARRAYS = {} # used as a hash set
@@next = 0
private_constant :FREE, :LOCK, :ARRAYS
# @!macro [attach] thread_local_var_method_initialize
#
# Creates a thread local variable.
#
# @param [Object] default the default value when otherwise unset
def initialize(default = nil)
@default = default
allocate_storage
end
# @!macro thread_local_var_method_get
def value
if array = get_threadlocal_array
value = array[@index]
if value.nil?
@default
elsif value.equal?(NIL_SENTINEL)
nil
else
value
end
else
@default
end
end
# @!macro thread_local_var_method_set
def value=(value)
me = Thread.current
# We could keep the thread-local arrays in a hash, keyed by Thread
# But why? That would require locking
# Using Ruby's built-in thread-local storage is faster
unless array = get_threadlocal_array(me)
array = set_threadlocal_array([], me)
LOCK.synchronize { ARRAYS[array.object_id] = array }
ObjectSpace.define_finalizer(me, self.class.thread_finalizer(array))
end
array[@index] = (value.nil? ? NIL_SENTINEL : value)
value
end
# @!macro thread_local_var_method_bind
def bind(value, &block)
if block_given?
old_value = self.value
begin
self.value = value
yield
ensure
self.value = old_value
end
end
end
protected
# @!visibility private
def allocate_storage
@index = LOCK.synchronize do
FREE.pop || begin
result = @@next
@@next += 1
result
end
end
ObjectSpace.define_finalizer(self, self.class.threadlocal_finalizer(@index))
end
# @!visibility private
def self.threadlocal_finalizer(index)
proc do
LOCK.synchronize do
FREE.push(index)
# The cost of GC'ing a TLV is linear in the number of threads using TLVs
# But that is natural! More threads means more storage is used per TLV
# So naturally more CPU time is required to free more storage
ARRAYS.each_value do |array|
array[index] = nil
end
end
end
end
# @!visibility private
def self.thread_finalizer(array)
proc do
LOCK.synchronize do
# The thread which used this thread-local array is now gone
# So don't hold onto a reference to the array (thus blocking GC)
ARRAYS.delete(array.object_id)
end
end
end
private
if Thread.instance_methods.include?(:thread_variable_get)
def get_threadlocal_array(thread = Thread.current)
thread.thread_variable_get(:__threadlocal_array__)
end
def set_threadlocal_array(array, thread = Thread.current)
thread.thread_variable_set(:__threadlocal_array__, array)
end
else
def get_threadlocal_array(thread = Thread.current)
thread[:__threadlocal_array__]
end
def set_threadlocal_array(array, thread = Thread.current)
thread[:__threadlocal_array__] = array
end
end
# This exists only for use in testing
# @!visibility private
def value_for(thread)
if array = get_threadlocal_array(thread)
value = array[@index]
if value.nil?
@default
elsif value.equal?(NIL_SENTINEL)
nil
else
value
end
else
@default
end
end
end
end