Skip to content

Commit

Permalink
Documentation and hiding constants
Browse files Browse the repository at this point in the history
  • Loading branch information
pitr-ch committed Jun 13, 2016
1 parent dc521fb commit 23d3cdf
Showing 1 changed file with 55 additions and 42 deletions.
97 changes: 55 additions & 42 deletions lib/concurrent/edge/promises.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def future(*args, &task)
future_on(:io, *args, &task)
end

# As {#future} but takes default_executor as first argument
def future_on(default_executor, *args, &task)
ImmediateEventPromise.new(default_executor).future.then(*args, &task)
end
Expand Down Expand Up @@ -68,6 +69,7 @@ def delay(*args, &task)
delay_on :io, *args, &task
end

# As {#delay} but takes default_executor as first argument
def delay_on(default_executor, *args, &task)
DelayPromise.new(default_executor).future.then(*args, &task)
end
Expand All @@ -79,6 +81,7 @@ def schedule(intended_time, *args, &task)
schedule_on :io, intended_time, *args, &task
end

# As {#schedule} but takes default_executor as first argument
def schedule_on(default_executor, intended_time, *args, &task)
ScheduledPromise.new(default_executor, intended_time).future.then(*args, &task)
end
Expand All @@ -92,6 +95,7 @@ def zip_futures(*futures_and_or_events)
zip_futures_on :io, *futures_and_or_events
end

# As {#zip_futures} but takes default_executor as first argument
def zip_futures_on(default_executor, *futures_and_or_events)
ZipFuturesPromise.new(futures_and_or_events, default_executor).future
end
Expand All @@ -106,6 +110,7 @@ def zip_events(*futures_and_or_events)
zip_events_on :io, *futures_and_or_events
end

# As {#zip_events} but takes default_executor as first argument
def zip_events_on(default_executor, *futures_and_or_events)
ZipEventsPromise.new(futures_and_or_events, default_executor).future
end
Expand All @@ -117,6 +122,7 @@ def any_complete_future(*futures)
any_complete_future_on :io, *futures
end

# As {#any_complete_future} but takes default_executor as first argument
def any_complete_future_on(default_executor, *futures)
AnyCompleteFuturePromise.new(futures, default_executor).future
end
Expand All @@ -131,14 +137,17 @@ def any_successful_future(*futures)
any_successful_future_on :io, *futures
end

# As {#any_succesful_future} but takes default_executor as first argument
def any_successful_future_on(default_executor, *futures)
AnySuccessfulFuturePromise.new(futures, default_executor).future
end

# Constructs new {Event} which becomes complete after first if the events completes.
def any_event(*events)
any_event_on :io, *events
end

# As {#any_event} but takes default_executor as first argument
def any_event_on(default_executor, *events)
AnyCompleteEventPromise.new(events, default_executor).event
end
Expand All @@ -147,14 +156,7 @@ def any_event_on(default_executor, *events)
# TODO consider adding zip_by(slice, *futures) processing futures in slices
end

# Represents an event which will happen in future (will be completed). It has to always happen.
class Event < Synchronization::Object
safe_initialization!
private(*attr_atomic(:internal_state))
# @!visibility private
public :internal_state
include Concern::Logging

module InternalStates
class State
def completed?
raise NotImplementedError
Expand Down Expand Up @@ -238,13 +240,17 @@ def to_sym
end
end

private_constant :Success

# @!visibility private
class SuccessArray < Success
def apply(args, block)
block.call(*value, *args)
end
end

private_constant :SuccessArray

# @!visibility private
class Failed < CompletedWithResult
def initialize(reason)
Expand Down Expand Up @@ -272,6 +278,8 @@ def apply(args, block)
end
end

private_constant :Failed

# @!visibility private
class PartiallyFailed < CompletedWithResult
def initialize(value, reason)
Expand Down Expand Up @@ -301,12 +309,26 @@ def apply(args, block)
end
end

private_constant :PartiallyFailed

# @!visibility private
PENDING = Pending.new
# @!visibility private
COMPLETED = Success.new(nil)

private_constant :PENDING, :COMPLETED
end

private_constant :InternalStates

# Represents an event which will happen in future (will be completed). It has to always happen.
class Event < Synchronization::Object
safe_initialization!
private(*attr_atomic(:internal_state))
# @!visibility private
public :internal_state

include Concern::Logging
include InternalStates

def initialize(promise, default_executor)
super()
@Lock = Mutex.new
Expand Down Expand Up @@ -910,6 +932,7 @@ def with_hidden_completable
# @abstract
class AbstractPromise < Synchronization::Object
safe_initialization!
include InternalStates
include Concern::Logging

def initialize(future)
Expand Down Expand Up @@ -950,12 +973,12 @@ def complete_with(new_state, raise_on_reassign = true)

# @return [Future]
def evaluate_to(*args, block)
complete_with Future::Success.new(block.call(*args))
complete_with Success.new(block.call(*args))
rescue StandardError => error
complete_with Future::Failed.new(error)
complete_with Failed.new(error)
rescue Exception => error
log(ERROR, 'Promises::Future', error)
complete_with Future::Failed.new(error)
complete_with Failed.new(error)
end
end

Expand All @@ -970,35 +993,24 @@ def initialize(default_executor)
super CompletableFuture.new(self, default_executor)
end

# Set the `Future` to a value and wake or notify all threads waiting on it.
#
# @param [Object] value the value to store in the `Future`
# @raise [Concurrent::MultipleAssignmentError] if the `Future` has already been set or otherwise completed
# @return [Future]
def success(value)
complete_with Future::Success.new(value)
complete_with Success.new(value)
end

def try_success(value)
!!complete_with(Future::Success.new(value), false)
!!complete_with(Success.new(value), false)
end

# Set the `Future` to failed due to some error and wake or notify all threads waiting on it.
#
# @param [Object] reason for the failure
# @raise [Concurrent::MultipleAssignmentError] if the `Future` has already been set or otherwise completed
# @return [Future]
def fail(reason = StandardError.new)
complete_with Future::Failed.new(reason)
complete_with Failed.new(reason)
end

def try_fail(reason = StandardError.new)
!!complete_with(Future::Failed.new(reason), false)
!!complete_with(Failed.new(reason), false)
end

public :evaluate_to

# @return [Future]
def evaluate_to!(*args, block)
evaluate_to(*args, block).wait!
end
Expand Down Expand Up @@ -1149,14 +1161,14 @@ def on_completable(done_future)
# will be immediately completed
class ImmediateEventPromise < InnerPromise
def initialize(default_executor)
super Event.new(self, default_executor).complete_with(Event::COMPLETED)
super Event.new(self, default_executor).complete_with(COMPLETED)
end
end

class ImmediateFuturePromise < InnerPromise
def initialize(default_executor, success, value, reason)
super Future.new(self, default_executor).
complete_with(success ? Future::Success.new(value) : Future::Failed.new(reason))
complete_with(success ? Success.new(value) : Failed.new(reason))
end
end

Expand Down Expand Up @@ -1223,7 +1235,7 @@ def initialize(event1, event2, default_executor)
end

def on_completable(done_future)
complete_with Event::COMPLETED
complete_with COMPLETED
end
end

Expand All @@ -1250,9 +1262,9 @@ def on_completable(done_future)
success2, value2, reason2 = @Future2Result.result
success = success1 && success2
new_state = if success
Future::SuccessArray.new([value1, value2])
SuccessArray.new([value1, value2])
else
Future::PartiallyFailed.new([value1, value2], [reason1, reason2])
PartiallyFailed.new([value1, value2], [reason1, reason2])
end
complete_with new_state
end
Expand All @@ -1264,7 +1276,7 @@ def initialize(event, default_executor)
end

def on_completable(done_future)
complete_with Event::COMPLETED
complete_with COMPLETED
end
end

Expand Down Expand Up @@ -1303,9 +1315,9 @@ def on_completable(done_future)
end

if all_success
complete_with Future::SuccessArray.new(values)
complete_with SuccessArray.new(values)
else
complete_with Future::PartiallyFailed.new(values, reasons)
complete_with PartiallyFailed.new(values, reasons)
end
end
end
Expand All @@ -1321,10 +1333,11 @@ def initialize(blocked_by_futures, default_executor)
end

def on_completable(done_future)
complete_with Event::COMPLETED
complete_with COMPLETED
end
end

# @abstract
class AbstractAnyPromise < BlockedPromise
def touch
blocked_by.each(&:touch) unless @Future.completed?
Expand Down Expand Up @@ -1361,7 +1374,7 @@ def completable?(countdown, future)
end

def on_completable(done_future)
complete_with Event::COMPLETED, false
complete_with COMPLETED, false
end
end

Expand All @@ -1370,14 +1383,15 @@ class AnySuccessfulFuturePromise < AnyCompleteFuturePromise
private

def completable?(countdown, future)
future.success? || super(countdown, future)
future.success? ||
# inlined super from BlockedPromise
countdown.zero?
end
end

class DelayPromise < InnerPromise
def touch
@Future.complete_with Event::COMPLETED
@Future.complete_with COMPLETED
end

private
Expand All @@ -1387,7 +1401,6 @@ def initialize(default_executor)
end
end

# will be evaluated to task in intended_time
class ScheduledPromise < InnerPromise
def intended_time
@IntendedTime
Expand Down Expand Up @@ -1415,7 +1428,7 @@ def initialize(default_executor, intended_time)
end

Concurrent.global_timer_set.post(in_seconds) do
@Future.complete_with Event::COMPLETED
@Future.complete_with COMPLETED
end
end
end
Expand Down

0 comments on commit 23d3cdf

Please sign in to comment.