Skip to content

Commit

Permalink
Debug on travis
Browse files Browse the repository at this point in the history
  • Loading branch information
pitr-ch committed Dec 24, 2016
1 parent 399d276 commit f6bc576
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ env:
before_script:
- "echo $JAVA_OPTS"

script: if [[ -v TRUFFLE ]]; then support/test-truffle.sh; else RUBYOPT=-w bundle exec rake ci; fi
script: if [[ -v TRUFFLE ]]; then support/test-truffle.sh; else RUBYOPT='-w -d' bundle exec rake ci; fi
36 changes: 20 additions & 16 deletions lib/concurrent/atomic/ruby_thread_local_var.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class RubyThreadLocalVar < AbstractThreadLocalVar
# array, so we don't leak memory

# @!visibility private
FREE = []
LOCK = Mutex.new
FREE = []
LOCK = Mutex.new
ARRAYS = {} # used as a hash set
@@next = 0
private_constant :FREE, :LOCK, :ARRAYS
Expand Down Expand Up @@ -72,9 +72,9 @@ def value=(value)
def allocate_storage
@index = LOCK.synchronize do
FREE.pop || begin
result = @@next
@@next += 1
result
result = @@next
@@next += 1
result
end
end
ObjectSpace.define_finalizer(self, self.class.threadlocal_finalizer(@index))
Expand All @@ -83,13 +83,15 @@ def allocate_storage
# @!visibility private
def self.threadlocal_finalizer(index)
proc do
LOCK.synchronize do
FREE.push(index)
# The cost of GC'ing a TLV is linear in the number of threads using TLVs
# But that is natural! More threads means more storage is used per TLV
# So naturally more CPU time is required to free more storage
ARRAYS.each_value do |array|
array[index] = nil
Thread.new do # avoid error: can't be called from trap context
LOCK.synchronize do
FREE.push(index)
# The cost of GC'ing a TLV is linear in the number of threads using TLVs
# But that is natural! More threads means more storage is used per TLV
# So naturally more CPU time is required to free more storage
ARRAYS.each_value do |array|
array[index] = nil
end
end
end
end
Expand All @@ -98,10 +100,12 @@ def self.threadlocal_finalizer(index)
# @!visibility private
def self.thread_finalizer(array)
proc do
LOCK.synchronize do
# The thread which used this thread-local array is now gone
# So don't hold onto a reference to the array (thus blocking GC)
ARRAYS.delete(array.object_id)
Thread.new do # avoid error: can't be called from trap context
LOCK.synchronize do
# The thread which used this thread-local array is now gone
# So don't hold onto a reference to the array (thus blocking GC)
ARRAYS.delete(array.object_id)
end
end
end
end
Expand Down
52 changes: 26 additions & 26 deletions spec/concurrent/actor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def initialize(queue)
def on_message(message)
case message
when :child
AdHoc.spawn(:pong, @queue) { |queue| -> m { queue << m } }
AdHoc.spawn!(:pong, @queue) { |queue| -> m { queue << m } }
else
@queue << message
message
Expand All @@ -33,16 +33,16 @@ def on_message(message)
end

it 'forbids Immediate executor' do
expect { Utils::AdHoc.spawn name: 'test', executor: ImmediateExecutor.new }.to raise_error
expect { Utils::AdHoc.spawn! name: 'test', executor: ImmediateExecutor.new }.to raise_error
end

describe 'spawning' do
describe 'Actor#spawn' do
describe 'Actor#spawn!' do
behaviour = -> v { -> _ { v } }
subjects = { spawn: -> { Actor.spawn(AdHoc, :ping, 'arg', &behaviour) },
context_spawn: -> { AdHoc.spawn(:ping, 'arg', &behaviour) },
spawn_by_hash: -> { Actor.spawn(class: AdHoc, name: :ping, args: ['arg'], &behaviour) },
context_spawn_by_hash: -> { AdHoc.spawn(name: :ping, args: ['arg'], &behaviour) } }
subjects = { spawn: -> { Actor.spawn!(AdHoc, :ping, 'arg', &behaviour) },
context_spawn: -> { AdHoc.spawn!(:ping, 'arg', &behaviour) },
spawn_by_hash: -> { Actor.spawn!(class: AdHoc, name: :ping, args: ['arg'], &behaviour) },
context_spawn_by_hash: -> { AdHoc.spawn!(name: :ping, args: ['arg'], &behaviour) } }

subjects.each do |desc, subject_definition|
describe desc do
Expand Down Expand Up @@ -89,14 +89,14 @@ def on_message(message)
end

it 'terminates on failed message processing' do
a = AdHoc.spawn(name: :fail, logger: Concurrent::NULL_LOGGER) { -> _ { raise } }
a = AdHoc.spawn!(name: :fail, logger: Concurrent::NULL_LOGGER) { -> _ { raise } }
expect(a.ask(nil).wait.rejected?).to be_truthy
expect(a.ask!(:terminated?)).to be_truthy
end
end

describe 'messaging' do
subject { AdHoc.spawn(:add) { c = 0; -> v { c = c + v } } }
subject { AdHoc.spawn!(:add) { c = 0; -> v { c = c + v } } }
specify do
subject.tell(1).tell(1)
subject << 1 << 1
Expand All @@ -107,10 +107,10 @@ def on_message(message)

describe 'children' do
let(:parent) do
AdHoc.spawn(:parent) do
AdHoc.spawn!(:parent) do
-> message do
if message == :child
AdHoc.spawn(:child) { -> _ { parent } }
AdHoc.spawn!(:child) { -> _ { parent } }
else
children
end
Expand All @@ -128,7 +128,7 @@ def on_message(message)
end

describe 'envelope' do
subject { AdHoc.spawn(:subject) { -> _ { envelope } } }
subject { AdHoc.spawn!(:subject) { -> _ { envelope } } }
specify do
envelope = subject.ask!('a')
expect(envelope).to be_a_kind_of Envelope
Expand All @@ -142,8 +142,8 @@ def on_message(message)

describe 'termination' do
subject do
AdHoc.spawn(:parent) do
child = AdHoc.spawn(:child) { -> v { v } }
AdHoc.spawn!(:parent) do
child = AdHoc.spawn!(:child) { -> v { v } }
-> v { child }
end
end
Expand Down Expand Up @@ -171,8 +171,8 @@ def on_message(message)

describe 'message redirecting' do
let(:parent) do
AdHoc.spawn(:parent) do
child = AdHoc.spawn(:child) { -> m { m+1 } }
AdHoc.spawn!(:parent) do
child = AdHoc.spawn!(:child) { -> m { m+1 } }
-> message do
if message == :child
child
Expand All @@ -192,9 +192,9 @@ def on_message(message)
queue = Queue.new
failure = nil
# FIXME this leads to weird message processing ordering
# failure = AdHoc.spawn(:failure) { -> m { terminate! } }
# failure = AdHoc.spawn!(:failure) { -> m { terminate! } }
monitor = AdHoc.spawn!(:monitor) do
failure = AdHoc.spawn(:failure) { -> m { m } }
failure = AdHoc.spawn!(:failure) { -> m { m } }
failure << :link
-> m { queue << [m, envelope.sender] }
end
Expand All @@ -209,7 +209,7 @@ def on_message(message)
queue = Queue.new
failure = nil
monitor = AdHoc.spawn!(:monitor) do
failure = AdHoc.spawn(name: :failure, link: true) { -> m { m } }
failure = AdHoc.spawn!(name: :failure, link: true) { -> m { m } }
-> m { queue << [m, envelope.sender] }
end

Expand All @@ -225,8 +225,8 @@ def on_message(message)
queue = Queue.new
resuming_behaviour = Behaviour.restarting_behaviour_definition(:resume!)

test = AdHoc.spawn name: :tester, behaviour_definition: resuming_behaviour do
actor = AdHoc.spawn name: :pausing, behaviour_definition: Behaviour.restarting_behaviour_definition do
test = AdHoc.spawn! name: :tester, behaviour_definition: resuming_behaviour do
actor = AdHoc.spawn! name: :pausing, behaviour_definition: Behaviour.restarting_behaviour_definition do
queue << :init
-> m { m == :add ? 1 : pass }
end
Expand All @@ -248,8 +248,8 @@ def on_message(message)

it 'pauses on error and resets' do
queue = Queue.new
test = AdHoc.spawn name: :tester, behaviour_definition: Behaviour.restarting_behaviour_definition do
actor = AdHoc.spawn name: :pausing, behaviour_definition: Behaviour.restarting_behaviour_definition do
test = AdHoc.spawn! name: :tester, behaviour_definition: Behaviour.restarting_behaviour_definition do
actor = AdHoc.spawn! name: :pausing, behaviour_definition: Behaviour.restarting_behaviour_definition do
queue << :init
-> m { m == :object_id ? self.object_id : pass }
end
Expand Down Expand Up @@ -284,9 +284,9 @@ def on_message(message)
end
end

test = AdHoc.spawn name: :tester, behaviour_definition: resuming_behaviour do
test = AdHoc.spawn! name: :tester, behaviour_definition: resuming_behaviour do

actor = AdHoc.spawn name: :pausing,
actor = AdHoc.spawn! name: :pausing,
behaviour_definition: Behaviour.restarting_behaviour_definition do
queue << :init
-> m { m == :add ? 1 : pass }
Expand Down Expand Up @@ -316,7 +316,7 @@ def on_message(message)
it 'supports asks', buggy: true do
children = Queue.new
pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |index|
worker = Concurrent::Actor::Utils::AdHoc.spawn name: "worker-#{index}", supervised: true do
worker = Concurrent::Actor::Utils::AdHoc.spawn! name: "worker-#{index}", supervised: true do
lambda do |message|
fail if message == :fail
5 + message
Expand Down

0 comments on commit f6bc576

Please sign in to comment.