Skip to content

Commit

Permalink
Moved thread local var classes into own files.
Browse files Browse the repository at this point in the history
  • Loading branch information
jdantonio committed Aug 18, 2015
1 parent cf131ae commit 5946545
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 294 deletions.
40 changes: 40 additions & 0 deletions lib/concurrent/atomic/abstract_thread_local_var.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module Concurrent

# @!macro thread_local_var
# @!macro internal_implementation_note
# @!visibility private
class AbstractThreadLocalVar

# @!visibility private
NIL_SENTINEL = Object.new
private_constant :NIL_SENTINEL

# @!macro thread_local_var_method_initialize
def initialize(default = nil)
@default = default
allocate_storage
end

# @!macro thread_local_var_method_get
def value
raise NotImplementedError
end

# @!macro thread_local_var_method_set
def value=(value)
raise NotImplementedError
end

# @!macro thread_local_var_method_bind
def bind(value, &block)
raise NotImplementedError
end

protected

# @!visibility private
def allocate_storage
raise NotImplementedError
end
end
end
50 changes: 50 additions & 0 deletions lib/concurrent/atomic/java_thread_local_var.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
require 'concurrent/atomic/abstract_thread_local_var'

if Concurrent.on_jruby?

module Concurrent

# @!visibility private
# @!macro internal_implementation_note
class JavaThreadLocalVar < AbstractThreadLocalVar

# @!macro thread_local_var_method_get
def value
value = @var.get

if value.nil?
@default
elsif value == NIL_SENTINEL
nil
else
value
end
end

# @!macro thread_local_var_method_set
def value=(value)
@var.set(value)
end

# @!macro thread_local_var_method_bind
def bind(value, &block)
if block_given?
old_value = @var.get
begin
@var.set(value)
yield
ensure
@var.set(old_value)
end
end
end

protected

# @!visibility private
def allocate_storage
@var = java.lang.ThreadLocal.new
end
end
end
end
172 changes: 172 additions & 0 deletions lib/concurrent/atomic/ruby_thread_local_var.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
require 'thread'
require 'concurrent/atomic/abstract_thread_local_var'

module Concurrent

# @!visibility private
# @!macro internal_implementation_note
class RubyThreadLocalVar < AbstractThreadLocalVar

# Each thread has a (lazily initialized) array of thread-local variable values
# Each time a new thread-local var is created, we allocate an "index" for it
# For example, if the allocated index is 1, that means slot #1 in EVERY
# thread's thread-local array will be used for the value of that TLV
#
# The good thing about using a per-THREAD structure to hold values, rather
# than a per-TLV structure, is that no synchronization is needed when
# reading and writing those values (since the structure is only ever
# accessed by a single thread)
#
# Of course, when a TLV is GC'd, 1) we need to recover its index for use
# by other new TLVs (otherwise the thread-local arrays could get bigger
# and bigger with time), and 2) we need to null out all the references
# held in the now-unused slots (both to avoid blocking GC of those objects,
# and also to prevent "stale" values from being passed on to a new TLV
# when the index is reused)
# Because we need to null out freed slots, we need to keep references to
# ALL the thread-local arrays -- ARRAYS is for that
# But when a Thread is GC'd, we need to drop the reference to its thread-local
# array, so we don't leak memory

# @!visibility private
FREE = []
LOCK = Mutex.new
ARRAYS = {} # used as a hash set
@@next = 0
private_constant :FREE, :LOCK, :ARRAYS

# @!macro [attach] thread_local_var_method_initialize
#
# Creates a thread local variable.
#
# @param [Object] default the default value when otherwise unset
def initialize(default = nil)
@default = default
allocate_storage
end

# @!macro thread_local_var_method_get
def value
if array = get_threadlocal_array
value = array[@index]
if value.nil?
@default
elsif value.equal?(NIL_SENTINEL)
nil
else
value
end
else
@default
end
end

# @!macro thread_local_var_method_set
def value=(value)
me = Thread.current
# We could keep the thread-local arrays in a hash, keyed by Thread
# But why? That would require locking
# Using Ruby's built-in thread-local storage is faster
unless array = get_threadlocal_array(me)
array = set_threadlocal_array([], me)
LOCK.synchronize { ARRAYS[array.object_id] = array }
ObjectSpace.define_finalizer(me, self.class.thread_finalizer(array))
end
array[@index] = (value.nil? ? NIL_SENTINEL : value)
value
end

# @!macro thread_local_var_method_bind
def bind(value, &block)
if block_given?
old_value = self.value
begin
self.value = value
yield
ensure
self.value = old_value
end
end
end

protected

# @!visibility private
def allocate_storage
@index = LOCK.synchronize do
FREE.pop || begin
result = @@next
@@next += 1
result
end
end
ObjectSpace.define_finalizer(self, self.class.threadlocal_finalizer(@index))
end

# @!visibility private
def self.threadlocal_finalizer(index)
proc do
LOCK.synchronize do
FREE.push(index)
# The cost of GC'ing a TLV is linear in the number of threads using TLVs
# But that is natural! More threads means more storage is used per TLV
# So naturally more CPU time is required to free more storage
ARRAYS.each_value do |array|
array[index] = nil
end
end
end
end

# @!visibility private
def self.thread_finalizer(array)
proc do
LOCK.synchronize do
# The thread which used this thread-local array is now gone
# So don't hold onto a reference to the array (thus blocking GC)
ARRAYS.delete(array.object_id)
end
end
end

private

if Thread.instance_methods.include?(:thread_variable_get)

def get_threadlocal_array(thread = Thread.current)
thread.thread_variable_get(:__threadlocal_array__)
end

def set_threadlocal_array(array, thread = Thread.current)
thread.thread_variable_set(:__threadlocal_array__, array)
end

else

def get_threadlocal_array(thread = Thread.current)
thread[:__threadlocal_array__]
end

def set_threadlocal_array(array, thread = Thread.current)
thread[:__threadlocal_array__] = array
end
end

# This exists only for use in testing
# @!visibility private
def value_for(thread)
if array = get_threadlocal_array(thread)
value = array[@index]
if value.nil?
@default
elsif value.equal?(NIL_SENTINEL)
nil
else
value
end
else
@default
end
end
end
end
Loading

0 comments on commit 5946545

Please sign in to comment.