Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a noop, never-released ref counted constant #103931

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions libs/core/src/main/java/org/elasticsearch/core/RefCounted.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,27 @@ default void mustIncRef() {
assert false : AbstractRefCounted.ALREADY_CLOSED_MESSAGE;
incRef(); // throws an ISE
}

/**
* A noop implementation that always behaves as if it is referenced and cannot be released.
*/
RefCounted ALWAYS_REFERENCED = new RefCounted() {
@Override
public void incRef() {}

@Override
public boolean tryIncRef() {
return true;
}

@Override
public boolean decRef() {
return false;
}

@Override
public boolean hasReferences() {
return true;
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
*/
public final class ReleasableBytesReference implements RefCounted, Releasable, BytesReference {

public static final Releasable NO_OP = () -> {};

private static final ReleasableBytesReference EMPTY = new ReleasableBytesReference(BytesArray.EMPTY, NO_OP);
private static final ReleasableBytesReference EMPTY = new ReleasableBytesReference(BytesArray.EMPTY, RefCounted.ALWAYS_REFERENCED);

private final BytesReference delegate;
private final RefCounted refCounted;
Expand All @@ -50,7 +48,7 @@ public ReleasableBytesReference(BytesReference delegate, RefCounted refCounted)

public static ReleasableBytesReference wrap(BytesReference reference) {
assert reference instanceof ReleasableBytesReference == false : "use #retain() instead of #wrap() on a " + reference.getClass();
return reference.length() == 0 ? empty() : new ReleasableBytesReference(reference, NO_OP);
return reference.length() == 0 ? empty() : new ReleasableBytesReference(reference, ALWAYS_REFERENCED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testBigIntArrayLivesAfterReleasableIsDecremented() throws IOExceptio
BytesStreamOutput out = new BytesStreamOutput();
testData.writeTo(out);

ReleasableBytesReference ref = ReleasableBytesReference.wrap(out.bytes());
ReleasableBytesReference ref = wrapAsReleasable(out.bytes());

try (IntArray in = IntArray.readFrom(ref.streamInput())) {
ref.decRef();
Expand All @@ -90,7 +90,7 @@ public void testBigDoubleArrayLivesAfterReleasableIsDecremented() throws IOExcep
BytesStreamOutput out = new BytesStreamOutput();
testData.writeTo(out);

ReleasableBytesReference ref = ReleasableBytesReference.wrap(out.bytes());
ReleasableBytesReference ref = wrapAsReleasable(out.bytes());

try (DoubleArray in = DoubleArray.readFrom(ref.streamInput())) {
ref.decRef();
Expand All @@ -109,7 +109,7 @@ public void testBigLongArrayLivesAfterReleasableIsDecremented() throws IOExcepti
BytesStreamOutput out = new BytesStreamOutput();
testData.writeTo(out);

ReleasableBytesReference ref = ReleasableBytesReference.wrap(out.bytes());
ReleasableBytesReference ref = wrapAsReleasable(out.bytes());

try (LongArray in = LongArray.readFrom(ref.streamInput())) {
ref.decRef();
Expand All @@ -128,7 +128,7 @@ public void testBigByteArrayLivesAfterReleasableIsDecremented() throws IOExcepti
BytesStreamOutput out = new BytesStreamOutput();
testData.writeTo(out);

ReleasableBytesReference ref = ReleasableBytesReference.wrap(out.bytes());
ReleasableBytesReference ref = wrapAsReleasable(out.bytes());

try (ByteArray in = ByteArray.readFrom(ref.streamInput())) {
ref.decRef();
Expand All @@ -140,4 +140,7 @@ public void testBigByteArrayLivesAfterReleasableIsDecremented() throws IOExcepti
assertThat(ref.hasReferences(), equalTo(false));
}

public static ReleasableBytesReference wrapAsReleasable(BytesReference bytesReference) {
return new ReleasableBytesReference(bytesReference, () -> {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.function.Predicate;

import static org.elasticsearch.common.bytes.ReleasableBytesReferenceStreamInputTests.wrapAsReleasable;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand Down Expand Up @@ -63,20 +64,20 @@ public void testInboundAggregation() throws IOException {
BytesArray bytes = new BytesArray(randomByteArrayOfLength(10));
ArrayList<ReleasableBytesReference> references = new ArrayList<>();
if (randomBoolean()) {
final ReleasableBytesReference content = ReleasableBytesReference.wrap(bytes);
final ReleasableBytesReference content = wrapAsReleasable(bytes);
references.add(content);
aggregator.aggregate(content);
content.close();
} else {
final ReleasableBytesReference content1 = ReleasableBytesReference.wrap(bytes.slice(0, 3));
final ReleasableBytesReference content1 = wrapAsReleasable(bytes.slice(0, 3));
references.add(content1);
aggregator.aggregate(content1);
content1.close();
final ReleasableBytesReference content2 = ReleasableBytesReference.wrap(bytes.slice(3, 3));
final ReleasableBytesReference content2 = wrapAsReleasable(bytes.slice(3, 3));
references.add(content2);
aggregator.aggregate(content2);
content2.close();
final ReleasableBytesReference content3 = ReleasableBytesReference.wrap(bytes.slice(6, 4));
final ReleasableBytesReference content3 = wrapAsReleasable(bytes.slice(6, 4));
references.add(content3);
aggregator.aggregate(content3);
content3.close();
Expand Down Expand Up @@ -108,7 +109,7 @@ public void testInboundUnknownAction() throws IOException {
aggregator.headerReceived(header);

BytesArray bytes = new BytesArray(randomByteArrayOfLength(10));
final ReleasableBytesReference content = ReleasableBytesReference.wrap(bytes);
final ReleasableBytesReference content = wrapAsReleasable(bytes);
aggregator.aggregate(content);
content.close();
assertFalse(content.hasReferences());
Expand Down Expand Up @@ -137,7 +138,7 @@ public void testCircuitBreak() throws IOException {
aggregator.headerReceived(breakableHeader);

BytesArray bytes = new BytesArray(randomByteArrayOfLength(10));
final ReleasableBytesReference content1 = ReleasableBytesReference.wrap(bytes);
final ReleasableBytesReference content1 = wrapAsReleasable(bytes);
aggregator.aggregate(content1);
content1.close();

Expand All @@ -161,7 +162,7 @@ public void testCircuitBreak() throws IOException {
// Initiate Message
aggregator.headerReceived(unbreakableHeader);

final ReleasableBytesReference content2 = ReleasableBytesReference.wrap(bytes);
final ReleasableBytesReference content2 = wrapAsReleasable(bytes);
aggregator.aggregate(content2);
content2.close();

Expand All @@ -180,7 +181,7 @@ public void testCircuitBreak() throws IOException {
// Initiate Message
aggregator.headerReceived(handshakeHeader);

final ReleasableBytesReference content3 = ReleasableBytesReference.wrap(bytes);
final ReleasableBytesReference content3 = wrapAsReleasable(bytes);
aggregator.aggregate(content3);
content3.close();

Expand All @@ -203,16 +204,16 @@ public void testCloseWillCloseContent() {
BytesArray bytes = new BytesArray(randomByteArrayOfLength(10));
ArrayList<ReleasableBytesReference> references = new ArrayList<>();
if (randomBoolean()) {
final ReleasableBytesReference content = ReleasableBytesReference.wrap(bytes);
final ReleasableBytesReference content = wrapAsReleasable(bytes);
references.add(content);
aggregator.aggregate(content);
content.close();
} else {
final ReleasableBytesReference content1 = ReleasableBytesReference.wrap(bytes.slice(0, 5));
final ReleasableBytesReference content1 = wrapAsReleasable(bytes.slice(0, 5));
references.add(content1);
aggregator.aggregate(content1);
content1.close();
final ReleasableBytesReference content2 = ReleasableBytesReference.wrap(bytes.slice(5, 5));
final ReleasableBytesReference content2 = wrapAsReleasable(bytes.slice(5, 5));
references.add(content2);
aggregator.aggregate(content2);
content2.close();
Expand Down Expand Up @@ -243,7 +244,7 @@ public void testFinishAggregationWillFinishHeader() throws IOException {
streamOutput.writeString(actionName);
streamOutput.write(randomByteArrayOfLength(10));

final ReleasableBytesReference content = ReleasableBytesReference.wrap(streamOutput.bytes());
final ReleasableBytesReference content = wrapAsReleasable(streamOutput.bytes());
aggregator.aggregate(content);
content.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.ArrayList;

import static org.elasticsearch.common.bytes.ReleasableBytesReferenceStreamInputTests.wrapAsReleasable;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void testDecode() throws IOException {

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(totalBytes);
final ReleasableBytesReference releasable1 = wrapAsReleasable(totalBytes);
int bytesConsumed = decoder.decode(releasable1, fragments::add);
assertEquals(totalHeaderSize, bytesConsumed);
assertTrue(releasable1.hasReferences());
Expand All @@ -104,7 +105,7 @@ public void testDecode() throws IOException {
fragments.clear();

final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
final ReleasableBytesReference releasable2 = ReleasableBytesReference.wrap(bytes2);
final ReleasableBytesReference releasable2 = wrapAsReleasable(bytes2);
int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
assertEquals(totalBytes.length() - totalHeaderSize, bytesConsumed2);

Expand Down Expand Up @@ -146,7 +147,7 @@ public void testDecodePreHeaderSizeVariableInt() throws IOException {

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(totalBytes);
final ReleasableBytesReference releasable1 = wrapAsReleasable(totalBytes);
int bytesConsumed = decoder.decode(releasable1, fragments::add);
assertEquals(partialHeaderSize, bytesConsumed);
assertTrue(releasable1.hasReferences());
Expand All @@ -165,7 +166,7 @@ public void testDecodePreHeaderSizeVariableInt() throws IOException {
fragments.clear();

final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
final ReleasableBytesReference releasable2 = ReleasableBytesReference.wrap(bytes2);
final ReleasableBytesReference releasable2 = wrapAsReleasable(bytes2);
int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
if (compressionScheme == null) {
assertEquals(2, fragments.size());
Expand Down Expand Up @@ -203,7 +204,7 @@ public void testDecodeHandshakeCompatibility() throws IOException {

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(bytes);
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
int bytesConsumed = decoder.decode(releasable1, fragments::add);
assertEquals(totalHeaderSize, bytesConsumed);
assertTrue(releasable1.hasReferences());
Expand Down Expand Up @@ -249,14 +250,14 @@ public void testClientChannelTypeFailsDecodingRequests() throws Exception {
try (InboundDecoder clientDecoder = new InboundDecoder(recycler, ChannelType.CLIENT)) {
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> clientDecoder.decode(ReleasableBytesReference.wrap(bytes), ignored -> {})
() -> clientDecoder.decode(wrapAsReleasable(bytes), ignored -> {})
);
assertThat(e.getMessage(), containsString("client channels do not accept inbound requests, only responses"));
}
// the same message will be decoded by a server or mixed decoder
try (InboundDecoder decoder = new InboundDecoder(recycler, randomFrom(ChannelType.SERVER, ChannelType.MIX))) {
final ArrayList<Object> fragments = new ArrayList<>();
int bytesConsumed = decoder.decode(ReleasableBytesReference.wrap(bytes), fragments::add);
int bytesConsumed = decoder.decode(wrapAsReleasable(bytes), fragments::add);
int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + bytes.getInt(
TcpHeader.VARIABLE_HEADER_SIZE_POSITION
);
Expand Down Expand Up @@ -291,14 +292,14 @@ public void testServerChannelTypeFailsDecodingResponses() throws Exception {
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
final BytesReference bytes = message.serialize(os);
try (InboundDecoder decoder = new InboundDecoder(recycler, ChannelType.SERVER)) {
final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(bytes);
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> decoder.decode(releasable1, ignored -> {}));
assertThat(e.getMessage(), containsString("server channels do not accept inbound responses, only requests"));
}
// the same message will be decoded by a client or mixed decoder
try (InboundDecoder decoder = new InboundDecoder(recycler, randomFrom(ChannelType.CLIENT, ChannelType.MIX))) {
final ArrayList<Object> fragments = new ArrayList<>();
int bytesConsumed = decoder.decode(ReleasableBytesReference.wrap(bytes), fragments::add);
int bytesConsumed = decoder.decode(wrapAsReleasable(bytes), fragments::add);
int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + bytes.getInt(
TcpHeader.VARIABLE_HEADER_SIZE_POSITION
);
Expand Down Expand Up @@ -350,7 +351,7 @@ public void testCompressedDecode() throws IOException {

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(totalBytes);
final ReleasableBytesReference releasable1 = wrapAsReleasable(totalBytes);
int bytesConsumed = decoder.decode(releasable1, fragments::add);
assertEquals(totalHeaderSize, bytesConsumed);
assertTrue(releasable1.hasReferences());
Expand All @@ -372,7 +373,7 @@ public void testCompressedDecode() throws IOException {
fragments.clear();

final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
final ReleasableBytesReference releasable2 = ReleasableBytesReference.wrap(bytes2);
final ReleasableBytesReference releasable2 = wrapAsReleasable(bytes2);
int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
assertEquals(totalBytes.length() - totalHeaderSize, bytesConsumed2);

Expand Down Expand Up @@ -414,7 +415,7 @@ public void testCompressedDecodeHandshakeCompatibility() throws IOException {

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(bytes);
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
int bytesConsumed = decoder.decode(releasable1, fragments::add);
assertEquals(totalHeaderSize, bytesConsumed);
assertTrue(releasable1.hasReferences());
Expand Down Expand Up @@ -451,7 +452,7 @@ public void testVersionIncompatibilityDecodeException() throws IOException {

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
try (ReleasableBytesReference r = ReleasableBytesReference.wrap(bytes)) {
try (ReleasableBytesReference r = wrapAsReleasable(bytes)) {
releasable1 = r;
expectThrows(IllegalStateException.class, () -> decoder.decode(releasable1, fragments::add));
}
Expand Down