From dc521fbbb5fdd77e0aaa127ba777428a2b5be293 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 11 Jun 2016 16:21:13 +0200 Subject: [PATCH] Update examples --- examples/init.rb | 2 + examples/promises.in.rb | 44 +++++++++++---- examples/promises.out.rb | 112 ++++++++++++++++++++++++--------------- 3 files changed, 104 insertions(+), 54 deletions(-) diff --git a/examples/init.rb b/examples/init.rb index c3ed8aafb..4fdb8550e 100644 --- a/examples/init.rb +++ b/examples/init.rb @@ -3,3 +3,5 @@ def do_stuff :stuff end + +Concurrent.use_stdlib_logger Logger::DEBUG diff --git a/examples/promises.in.rb b/examples/promises.in.rb index 8c2104bb1..5d09f4a72 100644 --- a/examples/promises.in.rb +++ b/examples/promises.in.rb @@ -5,7 +5,7 @@ ### Simple asynchronous task -future = future { sleep 0.1; 1 + 1 } # evaluation starts immediately +future = future(0.1) { |duration| sleep duration; :result } # evaluation starts immediately future.completed? # block until evaluated future.value @@ -21,6 +21,7 @@ # re-raising raise future rescue $! + ### Direct creation of completed futures succeeded_future(Object.new) @@ -120,7 +121,7 @@ ### Completable Future and Event future = completable_future -event = event() +event = completable_event() # These threads will be blocked until the future and event is completed t1 = Thread.new { future.value } # @@ -205,17 +206,40 @@ # periodic task -DONE = Concurrent::AtomicBoolean.new false - -def schedule_job - schedule(1) { do_stuff }. - rescue { |e| StandardError === e ? report_error(e) : raise(e) }. - then { schedule_job unless DONE.true? } +def schedule_job(interval, &job) + # schedule the first execution and chain restart og the job + Concurrent.schedule(interval, &job).chain do |success, continue, reason| + if success + schedule_job(interval, &job) if continue + else + # handle error + p reason + # retry + schedule_job(interval, &job) + end + end end -schedule_job -DONE.make_true +queue = Queue.new +count = 0 + +schedule_job 0.05 do + queue.push count + count += 1 + # to continue scheduling return true, false will end the task + if count < 4 + # to continue scheduling return true + true + else + queue.push nil + # to end the task return false + false + end +end +# read the queue +arr, v = [], nil; arr << v while (v = queue.pop) # +arr # How to limit processing where there are limited resources? # By creating an actor managing the resource diff --git a/examples/promises.out.rb b/examples/promises.out.rb index c6fd6d062..896fd6694 100644 --- a/examples/promises.out.rb +++ b/examples/promises.out.rb @@ -5,30 +5,31 @@ ### Simple asynchronous task -future = future { sleep 0.1; 1 + 1 } # evaluation starts immediately - # => <#Concurrent::Promises::Future:0x7fc5cc1e5340 pending blocks:[]> +future = future(0.1) { |duration| sleep duration; :result } # evaluation starts immediately + # => <#Concurrent::Promises::Future:0x7fa602198e30 pending blocks:[]> future.completed? # => false # block until evaluated -future.value # => 2 +future.value # => :result future.completed? # => true ### Failing asynchronous task future = future { raise 'Boom' } - # => <#Concurrent::Promises::Future:0x7fc5cc1dc808 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa602188a58 pending blocks:[]> future.value # => nil future.value! rescue $! # => # future.reason # => # # re-raising raise future rescue $! # => # + ### Direct creation of completed futures succeeded_future(Object.new) - # => <#Concurrent::Promises::Future:0x7fc5cc1c6030 success blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa60217bf10 success blocks:[]> failed_future(StandardError.new("boom")) - # => <#Concurrent::Promises::Future:0x7fc5cc1c50b8 failed blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa60217aa48 failed blocks:[]> ### Chaining of futures @@ -68,7 +69,7 @@ # => 3 failing_zip = succeeded_future(1) & failed_future(StandardError.new('boom')) - # => <#Concurrent::Promises::Future:0x7fc5cc11ec90 failed blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa602129300 failed blocks:[]> failing_zip.result # => [false, [1, nil], [nil, #]] failing_zip.then { |v| 'never happens' }.result # => [false, [1, nil], [nil, #]] failing_zip.rescue { |a, b| (a || b).message }.value @@ -81,7 +82,7 @@ # will not evaluate until asked by #value or other method requiring completion future = delay { 'lazy' } - # => <#Concurrent::Promises::Future:0x7fc5cc0ff660 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa60211a490 pending blocks:[]> sleep 0.1 future.completed? # => false future.value # => "lazy" @@ -89,20 +90,20 @@ # propagates trough chain allowing whole or partial lazy chains head = delay { 1 } - # => <#Concurrent::Promises::Future:0x7fc5cc0fc938 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa602113898 pending blocks:[]> branch1 = head.then(&:succ) - # => <#Concurrent::Promises::Future:0x7fc5cc0df068 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa602112a60 pending blocks:[]> branch2 = head.delay.then(&:succ) - # => <#Concurrent::Promises::Future:0x7fc5cc0dd178 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa602111188 pending blocks:[]> join = branch1 & branch2 - # => <#Concurrent::Promises::Future:0x7fc5cc0dc430 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa6021102b0 pending blocks:[]> sleep 0.1 # nothing will complete # => 0 [head, branch1, branch2, join].map(&:completed?) # => [false, false, false, false] branch1.value # => 2 sleep 0.1 # forces only head to complete, branch 2 stays incomplete - # => 0 + # => 1 [head, branch1, branch2, join].map(&:completed?) # => [true, true, false, false] join.value # => [2, 2] @@ -125,14 +126,14 @@ # it'll be executed after 0.1 seconds scheduled = schedule(0.1) { 1 } - # => <#Concurrent::Promises::Future:0x7fc5caaae028 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa60288a810 pending blocks:[]> scheduled.completed? # => false scheduled.value # available after 0.1sec # => 1 # and in chain scheduled = delay { 1 }.schedule(0.1).then(&:succ) - # => <#Concurrent::Promises::Future:0x7fc5caa9f2d0 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa603101638 pending blocks:[]> # will not be scheduled until value is requested sleep 0.1 scheduled.value # returns after another 0.1sec # => 2 @@ -141,21 +142,21 @@ ### Completable Future and Event future = completable_future - # => <#Concurrent::Promises::CompletableFuture:0x7fc5caa8eae8 pending blocks:[]> -event = event() - # => <#Concurrent::Promises::CompletableEvent:0x7fc5caa8d648 pending blocks:[]> + # => <#Concurrent::Promises::CompletableFuture:0x7fa6030e0ac8 pending blocks:[]> +event = completable_event() + # => <#Concurrent::Promises::CompletableEvent:0x7fa6030e0a78 pending blocks:[]> # These threads will be blocked until the future and event is completed t1 = Thread.new { future.value } t2 = Thread.new { event.wait } future.success 1 - # => <#Concurrent::Promises::CompletableFuture:0x7fc5caa8eae8 success blocks:[]> + # => <#Concurrent::Promises::CompletableFuture:0x7fa6030e0ac8 success blocks:[]> future.success 1 rescue $! # => # future.try_success 2 # => false event.complete - # => <#Concurrent::Promises::CompletableEvent:0x7fc5caa8d648 completed blocks:[]> + # => <#Concurrent::Promises::CompletableEvent:0x7fa6030e0a78 success blocks:[]> # The threads can be joined now [t1, t2].each &:join @@ -163,14 +164,14 @@ ### Callbacks -queue = Queue.new # => # +queue = Queue.new # => # future = delay { 1 + 1 } - # => <#Concurrent::Promises::Future:0x7fc5caa754d0 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa6030aa888 pending blocks:[]> future.on_success { queue << 1 } # evaluated asynchronously - # => <#Concurrent::Promises::Future:0x7fc5caa754d0 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa6030aa888 pending blocks:[]> future.on_success! { queue << 2 } # evaluated on completing thread - # => <#Concurrent::Promises::Future:0x7fc5caa754d0 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa6030aa888 pending blocks:[]> queue.empty? # => true future.value # => 2 @@ -188,7 +189,7 @@ # executed on executor for blocking and long operations then_on(:io) { File.read __FILE__ }. wait - # => <#Concurrent::Promises::Future:0x7fc5cb010b10 success blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa60307abb0 success blocks:[]> ### Interoperability with actors @@ -196,7 +197,7 @@ actor = Concurrent::Actor::Utils::AdHoc.spawn :square do -> v { v ** 2 } end - # => # + # => # future { 2 }. @@ -210,24 +211,24 @@ ### Interoperability with channels ch1 = Concurrent::Channel.new - # => #, @__condition__=#, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#> + # => #, @__condition__=#, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#> ch2 = Concurrent::Channel.new - # => #, @__condition__=#, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#> + # => #, @__condition__=#, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#> result = select(ch1, ch2) - # => <#Concurrent::Promises::Future:0x7fc5cc892e40 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa601a62d50 pending blocks:[]> ch1.put 1 # => true result.value! - # => [1, #, @__condition__=#, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#>] + # => [1, #, @__condition__=#, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#>] future { 1+1 }. then_put(ch1) - # => <#Concurrent::Promises::Future:0x7fc5cc87b920 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa601a51910 pending blocks:[]> result = future { '%02d' }. then_select(ch1, ch2). then { |format, (value, channel)| format format, value } - # => <#Concurrent::Promises::Future:0x7fc5cc862f60 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa601a2b008 pending blocks:[]> result.value! # => "02" @@ -235,7 +236,7 @@ # simple background processing future { do_stuff } - # => <#Concurrent::Promises::Future:0x7fc5cc080518 pending blocks:[]> + # => <#Concurrent::Promises::Future:0x7fa601a1b478 pending blocks:[]> # parallel background processing jobs = 10.times.map { |i| future { i } } @@ -243,18 +244,41 @@ # periodic task -DONE = Concurrent::AtomicBoolean.new false # => # - -def schedule_job - schedule(1) { do_stuff }. - rescue { |e| StandardError === e ? report_error(e) : raise(e) }. - then { schedule_job unless DONE.true? } +def schedule_job(interval, &job) + # schedule the first execution and chain restart og the job + Concurrent.schedule(interval, &job).chain do |success, continue, reason| + if success + schedule_job(interval, &job) if continue + else + # handle error + p reason + # retry + schedule_job(interval, &job) + end + end end # => :schedule_job -schedule_job - # => <#Concurrent::Promises::Future:0x7fc5ca9949d0 pending blocks:[]> -DONE.make_true # => true +queue = Queue.new # => # +count = 0 # => 0 + +schedule_job 0.05 do + queue.push count + count += 1 + # to continue scheduling return true, false will end the task + if count < 4 + # to continue scheduling return true + true + else + queue.push nil + # to end the task return false + false + end +end + # => <#Concurrent::Promises::Future:0x7fa6020c23d0 pending blocks:[]> +# read the queue +arr, v = [], nil; arr << v while (v = queue.pop) +arr # => [0, 1, 2, 3] # How to limit processing where there are limited resources? # By creating an actor managing the resource @@ -265,7 +289,7 @@ def schedule_job data[message] end end - # => # + # => # concurrent_jobs = 11.times.map do |v| @@ -295,7 +319,7 @@ def schedule_job end end end - # => # + # => # concurrent_jobs = 11.times.map do |v|