Skip to content

Commit

Permalink
refactor: finalize AsyncIterator support
Browse files Browse the repository at this point in the history
experimental feature
  • Loading branch information
bsorrentino committed Mar 25, 2024
1 parent 19b43fd commit f404e50
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 34 deletions.
25 changes: 2 additions & 23 deletions src/main/java/org/bsc/langgraph4j/async/AsyncIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,18 @@

public interface AsyncIterator<T> extends Iterable<T> {

record Data<T>(T data, boolean done, Throwable error) {
public Data(T data, boolean done) {
this(data, done, null );
}
public Data(Throwable error) {
this(null, false, error );
}
}
record Data<T>(T data, boolean done) {}

CompletableFuture<Data<T>> next();

default CompletableFuture<Void> forEachAsync( final AsyncFunction<T,Void> consumer) {

return next().thenCompose(data -> {
if (data.error != null ) {
var error = new CompletableFuture<Void>();
error.completeExceptionally(data.error);
return error;
}
if (data.done) {
return completedFuture(null);
}
return consumer.apply(data.data)
.thenCompose( v -> forEachAsync(consumer) );

});
}

Expand All @@ -46,27 +33,19 @@ public boolean hasNext() {
if (currentFetchedData.get() != null) {
return false;
}

var next = currentFetchedData.updateAndGet( (v) -> AsyncIterator.this.next().join() );

return !next.done();
return !currentFetchedData.updateAndGet( (v) -> AsyncIterator.this.next().join() ).done();
}

@Override
public T next() {

if (currentFetchedData.get() == null) {
if( !hasNext() ) {
throw new NoSuchElementException("no more elements into iterator");
}
}
if (currentFetchedData.get().error() != null ) {
throw new IllegalStateException(currentFetchedData.get().error());
}
if (currentFetchedData.get().done()) {
throw new NoSuchElementException("no more elements into iterator");
}

return currentFetchedData.getAndUpdate((v) -> null).data();
}
};
Expand Down
18 changes: 7 additions & 11 deletions src/main/java/org/bsc/langgraph4j/async/AsyncQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,20 @@ public AsyncQueue(Executor executor) {
* @throws InterruptedException if interrupted while waiting for space to become available
*/
public void put(E e) throws InterruptedException {
if( exception.get() != null ) {
throw new IllegalStateException("Queue has been closed with exception!");
}
Objects.requireNonNull(queue);
queue.put(new Data<>(e, false));
}

public void closeExceptionally(Throwable ex) {
Objects.requireNonNull(queue);
exception.set(ex);
}

@Override
public void close() throws Exception {
if (exception.get() != null) {
queue.put(new Data<>(exception.get()));
}
else {
queue.put(new Data<>(null, true));
}
Objects.requireNonNull(queue);
queue.put(new Data<>(null, true));
queue = null;
}


Expand All @@ -52,8 +48,8 @@ public CompletableFuture<Data<E>> next() {
return CompletableFuture.supplyAsync( () -> {
try {
var result = queue.take();
if( result.error() != null ) {
queue = null;
if( exception.get()!=null ) {
throw new RuntimeException(exception.get());
}
return result;
} catch (InterruptedException e) {
Expand Down

0 comments on commit f404e50

Please sign in to comment.