Skip to content

Commit

Permalink
More documentation for Throttle and other minor improvmenets
Browse files Browse the repository at this point in the history
  • Loading branch information
pitr-ch committed Dec 23, 2016
1 parent 9d7b3cb commit 41cf14d
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 87 deletions.
4 changes: 2 additions & 2 deletions doc/promises.in.md
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,12 @@ pool of size 3. We create throttle with the same size
```ruby
DB_INTERNAL_POOL = Concurrent::Array.new data

max_tree = Promises::Throttle.new 3
max_tree = Concurrent::Throttle.new 3

futures = 11.times.map do |i|
max_tree.
# throttled tasks, at most 3 simultaneous calls of [] on the database
then_throttled { DB_INTERNAL_POOL[i] }.
throttled_future { DB_INTERNAL_POOL[i] }.
# un-throttled tasks, unlimited concurrency
then { |starts| starts.size }.
rescue { |reason| reason.message }
Expand Down
106 changes: 53 additions & 53 deletions doc/promises.out.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ Class.new do
resolvable_event
end
end.new.a_method
# => <#Concurrent::Promises::ResolvableEvent:0x7fa95ca52488 pending>
# => <#Concurrent::Promises::ResolvableEvent:0x7fb4ba3e8c78 pending>

Module.new { extend Concurrent::Promises::FactoryMethods }.resolvable_event
# => <#Concurrent::Promises::ResolvableEvent:0x7fa95ca4a9e0 pending>
# => <#Concurrent::Promises::ResolvableEvent:0x7fb4ba3e2850 pending>
```

The module is already extended into {Concurrent::Promises} for convenience.

```ruby
Concurrent::Promises.resolvable_event
# => <#Concurrent::Promises::ResolvableEvent:0x7fa95ca48708 pending>
# => <#Concurrent::Promises::ResolvableEvent:0x7fb4ba3e08e8 pending>
```

For this guide we introduce a shortcut in `main` so we can call the factory
Expand All @@ -66,7 +66,7 @@ methods in following examples by using `Promisses` directly.
```ruby
Promises = Concurrent::Promises
Promises.resolvable_event
# => <#Concurrent::Promises::ResolvableEvent:0x7fa95ca41d18 pending>
# => <#Concurrent::Promises::ResolvableEvent:0x7fb4ba3d8be8 pending>
```

## Asynchronous task
Expand All @@ -82,7 +82,7 @@ future = Promises.future(0.1) do |duration|
sleep duration
:result
end
# => <#Concurrent::Promises::Future:0x7fa95b3eb410 pending>
# => <#Concurrent::Promises::Future:0x7fb4ba3c8248 pending>
```

Asks if the future is resolved, here it will be still in the middle of the
Expand All @@ -103,7 +103,7 @@ If the task fails we talk about the future being rejected.

```ruby
future = Promises.future { raise 'Boom' }
# => <#Concurrent::Promises::Future:0x7fa95b3dae58 pending>
# => <#Concurrent::Promises::Future:0x7fb4ba3b1bd8 pending>
```

There is no result, the future was rejected with a reason.
Expand Down Expand Up @@ -198,20 +198,20 @@ through evaluation as follows.

```ruby
Promises.future { :value }
# => <#Concurrent::Promises::Future:0x7fa95b89f510 pending>
# => <#Concurrent::Promises::Future:0x7fb4ba322230 pending>
```

Instead it can be created directly.

```ruby
Promises.fulfilled_future(:value)
# => <#Concurrent::Promises::Future:0x7fa95b390bc8 fulfilled>
# => <#Concurrent::Promises::Future:0x7fb4ba31a648 fulfilled>
Promises.rejected_future(StandardError.new('Ups'))
# => <#Concurrent::Promises::Future:0x7fa95b38b628 rejected>
# => <#Concurrent::Promises::Future:0x7fb4ba319298 rejected>
Promises.resolved_future(true, :value, nil)
# => <#Concurrent::Promises::Future:0x7fa95b38a688 fulfilled>
# => <#Concurrent::Promises::Future:0x7fb4ba3133e8 fulfilled>
Promises.resolved_future(false, nil, StandardError.new('Ups'))
# => <#Concurrent::Promises::Future:0x7fa95b3892b0 rejected>
# => <#Concurrent::Promises::Future:0x7fb4ba311700 rejected>
```

## Chaining
Expand Down Expand Up @@ -252,9 +252,9 @@ do_stuff arg }`) is **required**, both following examples may break.
```ruby
arg = 1 # => 1
Thread.new { do_stuff arg }
# => #<Thread:0x007fa95b322fd8@promises.in.md:193 run>
# => #<Thread:0x007fb4ba2a2710@promises.in.md:193 run>
Promises.future { do_stuff arg }
# => <#Concurrent::Promises::Future:0x7fa95b321020 pending>
# => <#Concurrent::Promises::Future:0x7fb4ba2a0dc0 pending>
```

## Branching, and zipping
Expand Down Expand Up @@ -316,7 +316,7 @@ Promises.
result
# => [false,
# nil,
# #<NoMethodError: undefined method `succ' for #<Object:0x007fa95ca28368>>]
# #<NoMethodError: undefined method `succ' for #<Object:0x007fb4ba2310d8>>]
```

As `then` chained tasks execute only on fulfilled futures, there is a `rescue`
Expand Down Expand Up @@ -364,7 +364,7 @@ Zip is rejected if any of the zipped futures is.
rejected_zip = Promises.zip(
Promises.fulfilled_future(1),
Promises.rejected_future(StandardError.new('Ups')))
# => <#Concurrent::Promises::Future:0x7fa95b2428e8 rejected>
# => <#Concurrent::Promises::Future:0x7fb4bc332390 rejected>
rejected_zip.result
# => [false, [1, nil], [nil, #<StandardError: Ups>]]
rejected_zip.
Expand All @@ -379,11 +379,11 @@ requiring resolution.

```ruby
future = Promises.delay { sleep 0.1; 'lazy' }
# => <#Concurrent::Promises::Future:0x7fa95b229eb0 pending>
# => <#Concurrent::Promises::Future:0x7fb4bc3188f0 pending>
sleep 0.1
future.resolved? # => false
future.touch
# => <#Concurrent::Promises::Future:0x7fa95b229eb0 pending>
# => <#Concurrent::Promises::Future:0x7fb4bc3188f0 pending>
sleep 0.2
future.resolved? # => true
```
Expand Down Expand Up @@ -460,7 +460,7 @@ Schedule task to be executed in 0.1 seconds.

```ruby
scheduled = Promises.schedule(0.1) { 1 }
# => <#Concurrent::Promises::Future:0x7fa95c9b1ce0 pending>
# => <#Concurrent::Promises::Future:0x7fb4bc288958 pending>
scheduled.resolved? # => false
```

Expand All @@ -485,7 +485,7 @@ Time can be used as well.

```ruby
Promises.schedule(Time.now + 10) { :val }
# => <#Concurrent::Promises::Future:0x7fa95c972ae0 pending>
# => <#Concurrent::Promises::Future:0x7fb4bc252330 pending>
```

## Resolvable Future and Event:
Expand All @@ -497,15 +497,15 @@ Sometimes it is required to resolve a future externally, in these cases

```ruby
future = Promises.resolvable_future
# => <#Concurrent::Promises::ResolvableFuture:0x7fa95c970e98 pending>
# => <#Concurrent::Promises::ResolvableFuture:0x7fb4bc250620 pending>
```

The thread will be blocked until the future is resolved

```ruby
thread = Thread.new { future.value }
future.fulfill 1
# => <#Concurrent::Promises::ResolvableFuture:0x7fa95c970e98 fulfilled>
# => <#Concurrent::Promises::ResolvableFuture:0x7fb4bc250620 fulfilled>
thread.value # => 1
```

Expand All @@ -522,9 +522,9 @@ future.fulfill 2, false # => false
## Callbacks

```ruby
queue = Queue.new # => #<Thread::Queue:0x007fa95e0565b8>
queue = Queue.new # => #<Thread::Queue:0x007fb4bb9d5e78>
future = Promises.delay { 1 + 1 }
# => <#Concurrent::Promises::Future:0x7fa95e0547b8 pending>
# => <#Concurrent::Promises::Future:0x7fb4bb9d4730 pending>

future.on_fulfillment { queue << 1 } # evaluated asynchronously
future.on_fulfillment! { queue << 2 } # evaluated on resolving thread
Expand All @@ -547,7 +547,7 @@ and `:io` for blocking and long tasks.
```ruby
Promises.future_on(:fast) { 2 }.
then_on(:io) { File.read __FILE__ }.
value.size # => 18760
value.size # => 18764
```

# Interoperability
Expand All @@ -560,7 +560,7 @@ Create an actor which takes received numbers and returns the number squared.
actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
-> v { v ** 2 }
end
# => #<Concurrent::Actor::Reference:0x7fa95c919e90 /square (Concurrent::Actor::Utils::AdHoc)>
# => #<Concurrent::Actor::Reference:0x7fb4bb986670 /square (Concurrent::Actor::Utils::AdHoc)>
```

Send result of `1+1` to the actor, and add 2 to the result send back from the
Expand Down Expand Up @@ -592,17 +592,17 @@ actor.ask(2).then(&:succ).value! # => 5

```ruby
Promises.future { do_stuff }
# => <#Concurrent::Promises::Future:0x7fa95b1eb8e0 pending>
# => <#Concurrent::Promises::Future:0x7fb4bb947740 pending>
```

## Parallel background processing

```ruby
tasks = 4.times.map { |i| Promises.future(i) { |i| i*2 } }
# => [<#Concurrent::Promises::Future:0x7fa95b1e2e48 pending>,
# <#Concurrent::Promises::Future:0x7fa95b1e1f70 pending>,
# <#Concurrent::Promises::Future:0x7fa95b1e1188 pending>,
# <#Concurrent::Promises::Future:0x7fa95b1e0198 pending>]
# => [<#Concurrent::Promises::Future:0x7fb4bb93f090 pending>,
# <#Concurrent::Promises::Future:0x7fb4bb93e488 pending>,
# <#Concurrent::Promises::Future:0x7fb4bb93d6f0 pending>,
# <#Concurrent::Promises::Future:0x7fb4bb93c778 pending>]
Promises.zip(*tasks).value! # => [0, 2, 4, 6]
```

Expand Down Expand Up @@ -655,11 +655,11 @@ Create the computer actor and send it 3 jobs.

```ruby
computer = Concurrent::Actor.spawn Computer, :computer
# => #<Concurrent::Actor::Reference:0x7fa95a3f2c48 /computer (Computer)>
# => #<Concurrent::Actor::Reference:0x7fb4ba9b3388 /computer (Computer)>
results = 3.times.map { computer.ask [:run, -> { sleep 0.1; :result }] }
# => [<#Concurrent::Promises::Future:0x7fa95a3d30c8 pending>,
# <#Concurrent::Promises::Future:0x7fa95a3d0e40 pending>,
# <#Concurrent::Promises::Future:0x7fa95a3cb760 pending>]
# => [<#Concurrent::Promises::Future:0x7fb4ba990450 pending>,
# <#Concurrent::Promises::Future:0x7fb4ba9897e0 pending>,
# <#Concurrent::Promises::Future:0x7fb4ba988318 pending>]
computer.ask(:status).value! # => {:running_jobs=>3}
results.map(&:value!) # => [:result, :result, :result]
```
Expand Down Expand Up @@ -706,8 +706,8 @@ Lets have two processes which will count until cancelled.

```ruby
source, token = Concurrent::Cancellation.create
# => [<#Concurrent::Cancellation:0x7fa95a223840 canceled:false>,
# <#Concurrent::Cancellation::Token:0x7fa95a2222b0 canceled:false>]
# => [<#Concurrent::Cancellation:0x7fb4ba1bc300 canceled:false>,
# <#Concurrent::Cancellation::Token:0x7fb4ba1b7670 canceled:false>]

count_until_cancelled = -> token, count do
if token.canceled?
Expand All @@ -720,12 +720,12 @@ end
futures = Array.new(2) do
Promises.future(token, 0, &count_until_cancelled).run
end
# => [<#Concurrent::Promises::Future:0x7fa95b1b8a08 pending>,
# <#Concurrent::Promises::Future:0x7fa95b1aa110 pending>]
# => [<#Concurrent::Promises::Future:0x7fb4ba13d578 pending>,
# <#Concurrent::Promises::Future:0x7fb4ba13c308 pending>]

sleep 0.01
source.cancel # => true
futures.map(&:value!) # => [65, 66]
futures.map(&:value!) # => [35, 34]
```

Cancellation can also be used as event or future to log or plan re-execution.
Expand All @@ -744,8 +744,8 @@ tasks share a cancellation, when one of them fails it cancels the others.

```ruby
source, token = Concurrent::Cancellation.create
# => [<#Concurrent::Cancellation:0x7fa95c9c8c60 canceled:false>,
# <#Concurrent::Cancellation::Token:0x7fa95c9c8710 canceled:false>]
# => [<#Concurrent::Cancellation:0x7fb4ba053130 canceled:false>,
# <#Concurrent::Cancellation::Token:0x7fb4ba051fb0 canceled:false>]
tasks = 4.times.map do |i|
Promises.future(source, token, i) do |source, token, i|
count = 0
Expand All @@ -761,22 +761,22 @@ tasks = 4.times.map do |i|
end
end
end
# => [<#Concurrent::Promises::Future:0x7fa95c9a1818 pending>,
# <#Concurrent::Promises::Future:0x7fa95c9a0aa8 pending>,
# <#Concurrent::Promises::Future:0x7fa95c98bb30 pending>,
# <#Concurrent::Promises::Future:0x7fa95c98aed8 pending>]
# => [<#Concurrent::Promises::Future:0x7fb4ba03b0f8 pending>,
# <#Concurrent::Promises::Future:0x7fb4ba0397a8 pending>,
# <#Concurrent::Promises::Future:0x7fb4ba038308 pending>,
# <#Concurrent::Promises::Future:0x7fb4ba0332b8 pending>]
Promises.zip(*tasks).result
# => [false,
# [nil, :cancelled, :cancelled, nil],
# [#<RuntimeError: random error>, nil, nil, #<RuntimeError: random error>]]
# [:cancelled, :cancelled, nil, :cancelled],
# [nil, nil, #<RuntimeError: random error>, nil]]
```

Without the randomly failing part it produces following.

```ruby
source, token = Concurrent::Cancellation.create
# => [<#Concurrent::Cancellation:0x7fa95ca897d0 canceled:false>,
# <#Concurrent::Cancellation::Token:0x7fa95ca83c68 canceled:false>]
# => [<#Concurrent::Cancellation:0x7fb4bb9ee0b8 canceled:false>,
# <#Concurrent::Cancellation::Token:0x7fb4bb9ed9d8 canceled:false>]
tasks = 4.times.map do |i|
Promises.future(source, token, i) do |source, token, i|
count = 0
Expand Down Expand Up @@ -884,13 +884,13 @@ DB_INTERNAL_POOL = Concurrent::Array.new data
# "********",
# "*********"]

max_tree = Promises::Throttle.new 3
# => <#Concurrent::Promises::Throttle:0x7fa95b2d0cb0 limit:3 can_run:3>
max_tree = Concurrent::Throttle.new 3
# => <#Concurrent::Throttle:0x7fb4ba2f22d8 limit:3 can_run:3>

futures = 11.times.map do |i|
max_tree.
# throttled tasks, at most 3 simultaneous calls of [] on the database
then_throttled { DB_INTERNAL_POOL[i] }.
throttled_future { DB_INTERNAL_POOL[i] }.
# un-throttled tasks, unlimited concurrency
then { |starts| starts.size }.
rescue { |reason| reason.message }
Expand Down Expand Up @@ -927,7 +927,7 @@ def schedule_job(interval, &job)
end
end

queue = Queue.new # => #<Thread::Queue:0x007fa95e08d838>
queue = Queue.new # => #<Thread::Queue:0x007fb4ba201568>
count = 0 # => 0
interval = 0.05 # small just not to delay execution of this example

Expand Down
1 change: 1 addition & 0 deletions lib/concurrent-edge.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@

require 'concurrent/edge/promises'
require 'concurrent/edge/cancellation'
require 'concurrent/edge/throttle'
2 changes: 1 addition & 1 deletion lib/concurrent/edge/lock_free_linked_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def remove(item)
#
# An iterator to loop through the set.
#
# @yield [Object] each item in the set
# @yield [item] each item in the set
# @yieldparam [Object] item the item you to remove from the set
#
# @return [Object] self: the linked set on which each was called
Expand Down
Loading

0 comments on commit 41cf14d

Please sign in to comment.