Skip to content

Commit

Permalink
Update examples
Browse files Browse the repository at this point in the history
  • Loading branch information
pitr-ch committed Jun 13, 2016
1 parent 1f86680 commit dc521fb
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 54 deletions.
2 changes: 2 additions & 0 deletions examples/init.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
def do_stuff
:stuff
end

Concurrent.use_stdlib_logger Logger::DEBUG
44 changes: 34 additions & 10 deletions examples/promises.in.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

### Simple asynchronous task

future = future { sleep 0.1; 1 + 1 } # evaluation starts immediately
future = future(0.1) { |duration| sleep duration; :result } # evaluation starts immediately
future.completed?
# block until evaluated
future.value
Expand All @@ -21,6 +21,7 @@
# re-raising
raise future rescue $!


### Direct creation of completed futures

succeeded_future(Object.new)
Expand Down Expand Up @@ -120,7 +121,7 @@
### Completable Future and Event

future = completable_future
event = event()
event = completable_event()

# These threads will be blocked until the future and event is completed
t1 = Thread.new { future.value } #
Expand Down Expand Up @@ -205,17 +206,40 @@


# periodic task
DONE = Concurrent::AtomicBoolean.new false

def schedule_job
schedule(1) { do_stuff }.
rescue { |e| StandardError === e ? report_error(e) : raise(e) }.
then { schedule_job unless DONE.true? }
def schedule_job(interval, &job)
# schedule the first execution and chain restart og the job
Concurrent.schedule(interval, &job).chain do |success, continue, reason|
if success
schedule_job(interval, &job) if continue
else
# handle error
p reason
# retry
schedule_job(interval, &job)
end
end
end

schedule_job
DONE.make_true
queue = Queue.new
count = 0

schedule_job 0.05 do
queue.push count
count += 1
# to continue scheduling return true, false will end the task
if count < 4
# to continue scheduling return true
true
else
queue.push nil
# to end the task return false
false
end
end

# read the queue
arr, v = [], nil; arr << v while (v = queue.pop) #
arr

# How to limit processing where there are limited resources?
# By creating an actor managing the resource
Expand Down
112 changes: 68 additions & 44 deletions examples/promises.out.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,31 @@

### Simple asynchronous task

future = future { sleep 0.1; 1 + 1 } # evaluation starts immediately
# => <#Concurrent::Promises::Future:0x7fc5cc1e5340 pending blocks:[]>
future = future(0.1) { |duration| sleep duration; :result } # evaluation starts immediately
# => <#Concurrent::Promises::Future:0x7fa602198e30 pending blocks:[]>
future.completed? # => false
# block until evaluated
future.value # => 2
future.value # => :result
future.completed? # => true


### Failing asynchronous task

future = future { raise 'Boom' }
# => <#Concurrent::Promises::Future:0x7fc5cc1dc808 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa602188a58 pending blocks:[]>
future.value # => nil
future.value! rescue $! # => #<RuntimeError: Boom>
future.reason # => #<RuntimeError: Boom>
# re-raising
raise future rescue $! # => #<RuntimeError: Boom>


### Direct creation of completed futures

succeeded_future(Object.new)
# => <#Concurrent::Promises::Future:0x7fc5cc1c6030 success blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa60217bf10 success blocks:[]>
failed_future(StandardError.new("boom"))
# => <#Concurrent::Promises::Future:0x7fc5cc1c50b8 failed blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa60217aa48 failed blocks:[]>

### Chaining of futures

Expand Down Expand Up @@ -68,7 +69,7 @@
# => 3

failing_zip = succeeded_future(1) & failed_future(StandardError.new('boom'))
# => <#Concurrent::Promises::Future:0x7fc5cc11ec90 failed blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa602129300 failed blocks:[]>
failing_zip.result # => [false, [1, nil], [nil, #<StandardError: boom>]]
failing_zip.then { |v| 'never happens' }.result # => [false, [1, nil], [nil, #<StandardError: boom>]]
failing_zip.rescue { |a, b| (a || b).message }.value
Expand All @@ -81,28 +82,28 @@

# will not evaluate until asked by #value or other method requiring completion
future = delay { 'lazy' }
# => <#Concurrent::Promises::Future:0x7fc5cc0ff660 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa60211a490 pending blocks:[]>
sleep 0.1
future.completed? # => false
future.value # => "lazy"

# propagates trough chain allowing whole or partial lazy chains

head = delay { 1 }
# => <#Concurrent::Promises::Future:0x7fc5cc0fc938 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa602113898 pending blocks:[]>
branch1 = head.then(&:succ)
# => <#Concurrent::Promises::Future:0x7fc5cc0df068 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa602112a60 pending blocks:[]>
branch2 = head.delay.then(&:succ)
# => <#Concurrent::Promises::Future:0x7fc5cc0dd178 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa602111188 pending blocks:[]>
join = branch1 & branch2
# => <#Concurrent::Promises::Future:0x7fc5cc0dc430 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa6021102b0 pending blocks:[]>

sleep 0.1 # nothing will complete # => 0
[head, branch1, branch2, join].map(&:completed?) # => [false, false, false, false]

branch1.value # => 2
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
# => 0
# => 1
[head, branch1, branch2, join].map(&:completed?) # => [true, true, false, false]

join.value # => [2, 2]
Expand All @@ -125,14 +126,14 @@

# it'll be executed after 0.1 seconds
scheduled = schedule(0.1) { 1 }
# => <#Concurrent::Promises::Future:0x7fc5caaae028 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa60288a810 pending blocks:[]>

scheduled.completed? # => false
scheduled.value # available after 0.1sec # => 1

# and in chain
scheduled = delay { 1 }.schedule(0.1).then(&:succ)
# => <#Concurrent::Promises::Future:0x7fc5caa9f2d0 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa603101638 pending blocks:[]>
# will not be scheduled until value is requested
sleep 0.1
scheduled.value # returns after another 0.1sec # => 2
Expand All @@ -141,36 +142,36 @@
### Completable Future and Event

future = completable_future
# => <#Concurrent::Promises::CompletableFuture:0x7fc5caa8eae8 pending blocks:[]>
event = event()
# => <#Concurrent::Promises::CompletableEvent:0x7fc5caa8d648 pending blocks:[]>
# => <#Concurrent::Promises::CompletableFuture:0x7fa6030e0ac8 pending blocks:[]>
event = completable_event()
# => <#Concurrent::Promises::CompletableEvent:0x7fa6030e0a78 pending blocks:[]>

# These threads will be blocked until the future and event is completed
t1 = Thread.new { future.value }
t2 = Thread.new { event.wait }

future.success 1
# => <#Concurrent::Promises::CompletableFuture:0x7fc5caa8eae8 success blocks:[]>
# => <#Concurrent::Promises::CompletableFuture:0x7fa6030e0ac8 success blocks:[]>
future.success 1 rescue $!
# => #<Concurrent::MultipleAssignmentError: Future can be completed only once. Current result is [true, 1, nil], trying to set [true, 1, nil]>
future.try_success 2 # => false
event.complete
# => <#Concurrent::Promises::CompletableEvent:0x7fc5caa8d648 completed blocks:[]>
# => <#Concurrent::Promises::CompletableEvent:0x7fa6030e0a78 success blocks:[]>

# The threads can be joined now
[t1, t2].each &:join


### Callbacks

queue = Queue.new # => #<Thread::Queue:0x007fc5caa770c8>
queue = Queue.new # => #<Thread::Queue:0x007fa6030ab5f8>
future = delay { 1 + 1 }
# => <#Concurrent::Promises::Future:0x7fc5caa754d0 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa6030aa888 pending blocks:[]>

future.on_success { queue << 1 } # evaluated asynchronously
# => <#Concurrent::Promises::Future:0x7fc5caa754d0 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa6030aa888 pending blocks:[]>
future.on_success! { queue << 2 } # evaluated on completing thread
# => <#Concurrent::Promises::Future:0x7fc5caa754d0 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa6030aa888 pending blocks:[]>

queue.empty? # => true
future.value # => 2
Expand All @@ -188,15 +189,15 @@
# executed on executor for blocking and long operations
then_on(:io) { File.read __FILE__ }.
wait
# => <#Concurrent::Promises::Future:0x7fc5cb010b10 success blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa60307abb0 success blocks:[]>


### Interoperability with actors

actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
-> v { v ** 2 }
end
# => #<Concurrent::Actor::Reference:0x7fc5caa35268 /square (Concurrent::Actor::Utils::AdHoc)>
# => #<Concurrent::Actor::Reference:0x7fa601ab8ea8 /square (Concurrent::Actor::Utils::AdHoc)>


future { 2 }.
Expand All @@ -210,51 +211,74 @@
### Interoperability with channels

ch1 = Concurrent::Channel.new
# => #<Concurrent::Channel:0x007fc5ca9f4448 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fc5ca9f41f0 @__lock__=#<Mutex:0x007fc5ca9f4128>, @__condition__=#<Thread::ConditionVariable:0x007fc5ca9f4088>, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fc5cc0ce970@/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>
# => #<Concurrent::Channel:0x007fa601a6a2d0 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fa601a6a230 @__lock__=#<Mutex:0x007fa601a6a168>, @__condition__=#<Thread::ConditionVariable:0x007fa601a6a140>, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fa6028b12d0@/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>
ch2 = Concurrent::Channel.new
# => #<Concurrent::Channel:0x007fc5cc89a528 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fc5cc89a2a8 @__lock__=#<Mutex:0x007fc5cc89a140>, @__condition__=#<Thread::ConditionVariable:0x007fc5cc89a078>, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fc5cc0ce970@/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>
# => #<Concurrent::Channel:0x007fa601a69380 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fa601a69308 @__lock__=#<Mutex:0x007fa601a69268>, @__condition__=#<Thread::ConditionVariable:0x007fa601a69218>, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fa6028b12d0@/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>

result = select(ch1, ch2)
# => <#Concurrent::Promises::Future:0x7fc5cc892e40 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa601a62d50 pending blocks:[]>
ch1.put 1 # => true
result.value!
# => [1, #<Concurrent::Channel:0x007fc5ca9f4448 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fc5ca9f41f0 @__lock__=#<Mutex:0x007fc5ca9f4128>, @__condition__=#<Thread::ConditionVariable:0x007fc5ca9f4088>, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fc5cc0ce970@/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>]
# => [1, #<Concurrent::Channel:0x007fa601a6a2d0 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fa601a6a230 @__lock__=#<Mutex:0x007fa601a6a168>, @__condition__=#<Thread::ConditionVariable:0x007fa601a6a140>, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fa6028b12d0@/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>]


future { 1+1 }.
then_put(ch1)
# => <#Concurrent::Promises::Future:0x7fc5cc87b920 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa601a51910 pending blocks:[]>
result = future { '%02d' }.
then_select(ch1, ch2).
then { |format, (value, channel)| format format, value }
# => <#Concurrent::Promises::Future:0x7fc5cc862f60 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa601a2b008 pending blocks:[]>
result.value! # => "02"


### Common use-cases Examples

# simple background processing
future { do_stuff }
# => <#Concurrent::Promises::Future:0x7fc5cc080518 pending blocks:[]>
# => <#Concurrent::Promises::Future:0x7fa601a1b478 pending blocks:[]>

# parallel background processing
jobs = 10.times.map { |i| future { i } }
zip(*jobs).value # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


# periodic task
DONE = Concurrent::AtomicBoolean.new false # => #<Concurrent::AtomicBoolean:0x007fc5cc079a38>

def schedule_job
schedule(1) { do_stuff }.
rescue { |e| StandardError === e ? report_error(e) : raise(e) }.
then { schedule_job unless DONE.true? }
def schedule_job(interval, &job)
# schedule the first execution and chain restart og the job
Concurrent.schedule(interval, &job).chain do |success, continue, reason|
if success
schedule_job(interval, &job) if continue
else
# handle error
p reason
# retry
schedule_job(interval, &job)
end
end
end # => :schedule_job

schedule_job
# => <#Concurrent::Promises::Future:0x7fc5ca9949d0 pending blocks:[]>
DONE.make_true # => true
queue = Queue.new # => #<Thread::Queue:0x007fa6020d2cf8>
count = 0 # => 0

schedule_job 0.05 do
queue.push count
count += 1
# to continue scheduling return true, false will end the task
if count < 4
# to continue scheduling return true
true
else
queue.push nil
# to end the task return false
false
end
end
# => <#Concurrent::Promises::Future:0x7fa6020c23d0 pending blocks:[]>

# read the queue
arr, v = [], nil; arr << v while (v = queue.pop)
arr # => [0, 1, 2, 3]

# How to limit processing where there are limited resources?
# By creating an actor managing the resource
Expand All @@ -265,7 +289,7 @@ def schedule_job
data[message]
end
end
# => #<Concurrent::Actor::Reference:0x7fc5cc843318 /db (Concurrent::Actor::Utils::AdHoc)>
# => #<Concurrent::Actor::Reference:0x7fa6019b8788 /db (Concurrent::Actor::Utils::AdHoc)>

concurrent_jobs = 11.times.map do |v|

Expand Down Expand Up @@ -295,7 +319,7 @@ def schedule_job
end
end
end
# => #<Concurrent::Actor::Reference:0x7fc5ca89a160 /DB-pool (Concurrent::Actor::Utils::Pool)>
# => #<Concurrent::Actor::Reference:0x7fa60284b278 /DB-pool (Concurrent::Actor::Utils::Pool)>

concurrent_jobs = 11.times.map do |v|

Expand Down

0 comments on commit dc521fb

Please sign in to comment.