Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [log] Do not print error log if tenant/namespace does not exist when calling get topic metadata #23291

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand Down Expand Up @@ -672,8 +674,6 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), requestId);
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
topicName, ex.getMessage(), ex);
ServerError error = ServerError.ServiceNotReady;
if (ex instanceof MetadataStoreException) {
error = ServerError.MetadataError;
Expand All @@ -685,6 +685,14 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
error = ServerError.MetadataError;
}
}
if (error == ServerError.TopicNotFound) {
log.info("Trying to get Partitioned Metadata for a resource not exist"
+ "[{}] {}: {}", remoteAddress,
topicName, ex.getMessage());
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
topicName, ex.getMessage(), ex);
}
commandSender.sendPartitionMetadataResponse(error, ex.getMessage(),
requestId);
}
Expand All @@ -702,6 +710,16 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "partition-metadata", getPrincipal(), Optional.of(topicName), ex);
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof WebApplicationException restException) {
if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
"Tenant or namespace or topic does not exist: " + topicName.getNamespace() ,
requestId));
lookupSemaphore.release();
return null;
}
}
final String msg = "Exception occurred while trying to authorize get Partition Metadata";
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg,
requestId));
Expand Down Expand Up @@ -3660,13 +3678,22 @@ protected void messageReceived() {
private static void logAuthException(SocketAddress remoteAddress, String operation,
String principal, Optional<TopicName> topic, Throwable ex) {
String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
if (ex instanceof AuthenticationException) {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof AuthenticationException) {
log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
remoteAddress, operation, principal, topicString, ex.getMessage());
} else {
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
remoteAddress, operation, principal, topicString, ex);
remoteAddress, operation, principal, topicString, actEx.getMessage());
return;
} else if (actEx instanceof WebApplicationException restException){
// Do not print error log if users tries to access a not found resource.
if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
log.info("[{}] Trying to authenticate for a topic which under a namespace not exists: operation={},"
+ " principal={}{}, reason: {}",
remoteAddress, operation, principal, topicString, actEx.getMessage());
return;
}
}
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
remoteAddress, operation, principal, topicString, ex);
}

private static void logNamespaceNameAuthException(SocketAddress remoteAddress, String operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,4 +578,55 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
}

@Test(dataProvider = "topicDomains")
public void testNamespaceNotExist(TopicDomain topicDomain) throws Exception {
int lookupPermitsBefore = getLookupRequestPermits();
final String namespaceNotExist = BrokerTestUtil.newUniqueName("public/ns");
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.toString() + "://" + namespaceNotExist + "/tp");
PulsarClientImpl[] clientArray = getClientsToTest(false);
for (PulsarClientImpl client : clientArray) {
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, true, true)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
fail("Expected a not found ex");
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException ||
unwrapEx instanceof PulsarClientException.TopicDoesNotExistException);
}
}
// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
}

@Test(dataProvider = "topicDomains")
public void testTenantNotExist(TopicDomain topicDomain) throws Exception {
int lookupPermitsBefore = getLookupRequestPermits();
final String tenantNotExist = BrokerTestUtil.newUniqueName("tenant");
final String namespaceNotExist = BrokerTestUtil.newUniqueName(tenantNotExist + "/default");
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.toString() + "://" + namespaceNotExist + "/tp");
PulsarClientImpl[] clientArray = getClientsToTest(false);
for (PulsarClientImpl client : clientArray) {
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, true, true)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
fail("Expected a not found ex");
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException ||
unwrapEx instanceof PulsarClientException.TopicDoesNotExistException);
}
}
// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
}
}
Loading