diff --git a/src/main/java/org/bsc/langgraph4j/async/AsyncIterator.java b/src/main/java/org/bsc/langgraph4j/async/AsyncIterator.java index b4601de..6b4b9fe 100644 --- a/src/main/java/org/bsc/langgraph4j/async/AsyncIterator.java +++ b/src/main/java/org/bsc/langgraph4j/async/AsyncIterator.java @@ -9,31 +9,18 @@ public interface AsyncIterator extends Iterable { - record Data(T data, boolean done, Throwable error) { - public Data(T data, boolean done) { - this(data, done, null ); - } - public Data(Throwable error) { - this(null, false, error ); - } - } + record Data(T data, boolean done) {} CompletableFuture> next(); default CompletableFuture forEachAsync( final AsyncFunction consumer) { return next().thenCompose(data -> { - if (data.error != null ) { - var error = new CompletableFuture(); - error.completeExceptionally(data.error); - return error; - } if (data.done) { return completedFuture(null); } return consumer.apply(data.data) .thenCompose( v -> forEachAsync(consumer) ); - }); } @@ -46,27 +33,19 @@ public boolean hasNext() { if (currentFetchedData.get() != null) { return false; } - - var next = currentFetchedData.updateAndGet( (v) -> AsyncIterator.this.next().join() ); - - return !next.done(); + return !currentFetchedData.updateAndGet( (v) -> AsyncIterator.this.next().join() ).done(); } @Override public T next() { - if (currentFetchedData.get() == null) { if( !hasNext() ) { throw new NoSuchElementException("no more elements into iterator"); } } - if (currentFetchedData.get().error() != null ) { - throw new IllegalStateException(currentFetchedData.get().error()); - } if (currentFetchedData.get().done()) { throw new NoSuchElementException("no more elements into iterator"); } - return currentFetchedData.getAndUpdate((v) -> null).data(); } }; diff --git a/src/main/java/org/bsc/langgraph4j/async/AsyncQueue.java b/src/main/java/org/bsc/langgraph4j/async/AsyncQueue.java index a2af80f..c7d6ffd 100644 --- a/src/main/java/org/bsc/langgraph4j/async/AsyncQueue.java +++ b/src/main/java/org/bsc/langgraph4j/async/AsyncQueue.java @@ -26,24 +26,20 @@ public AsyncQueue(Executor executor) { * @throws InterruptedException if interrupted while waiting for space to become available */ public void put(E e) throws InterruptedException { - if( exception.get() != null ) { - throw new IllegalStateException("Queue has been closed with exception!"); - } + Objects.requireNonNull(queue); queue.put(new Data<>(e, false)); } public void closeExceptionally(Throwable ex) { + Objects.requireNonNull(queue); exception.set(ex); } @Override public void close() throws Exception { - if (exception.get() != null) { - queue.put(new Data<>(exception.get())); - } - else { - queue.put(new Data<>(null, true)); - } + Objects.requireNonNull(queue); + queue.put(new Data<>(null, true)); + queue = null; } @@ -52,8 +48,8 @@ public CompletableFuture> next() { return CompletableFuture.supplyAsync( () -> { try { var result = queue.take(); - if( result.error() != null ) { - queue = null; + if( exception.get()!=null ) { + throw new RuntimeException(exception.get()); } return result; } catch (InterruptedException e) {