Add audit logging for streamed HTTP content (#130594)

This commit is contained in:
Mikhail Berezovskiy 2025-07-07 11:38:25 -07:00 committed by GitHub
parent e22b20704a
commit 209caaf9ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 48 additions and 16 deletions

View File

@ -0,0 +1,5 @@
pr: 130594
summary: Add audit logging for stream content
area: Network
type: enhancement
issues: []

View File

@ -109,10 +109,11 @@ public class AuditIT extends ESRestTestCase {
public void testAuditAuthenticationSuccessForStreamingRequest() throws Exception { public void testAuditAuthenticationSuccessForStreamingRequest() throws Exception {
final Request request = new Request("POST", "/testindex/_bulk"); final Request request = new Request("POST", "/testindex/_bulk");
request.setEntity(new StringEntity(""" final String content = """
{"index":{}} {"index":{}}
{} {}
""", ContentType.create("application/x-ndjson", StandardCharsets.UTF_8))); """;
request.setEntity(new StringEntity(content, ContentType.create("application/x-ndjson", StandardCharsets.UTF_8)));
executeAndVerifyAudit( executeAndVerifyAudit(
request, request,
AuditLevel.AUTHENTICATION_SUCCESS, AuditLevel.AUTHENTICATION_SUCCESS,
@ -120,7 +121,7 @@ public class AuditIT extends ESRestTestCase {
event, event,
allOf( allOf(
hasEntry(LoggingAuditTrail.AUTHENTICATION_TYPE_FIELD_NAME, "REALM"), hasEntry(LoggingAuditTrail.AUTHENTICATION_TYPE_FIELD_NAME, "REALM"),
hasEntry(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, "Request body had not been received at the time of the audit event") hasEntry(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, content)
) )
) )
); );

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.AuthenticationToken; import org.elasticsearch.xpack.core.security.authc.AuthenticationToken;
import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationInfo; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationInfo;
import org.elasticsearch.xpack.security.Security; import org.elasticsearch.xpack.security.Security;
import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail;
import org.elasticsearch.xpack.security.transport.filter.SecurityIpFilterRule; import org.elasticsearch.xpack.security.transport.filter.SecurityIpFilterRule;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -53,6 +54,14 @@ public class AuditTrailService {
} }
} }
public boolean includeRequestBody() {
if (get() instanceof LoggingAuditTrail trail) {
return trail.includeRequestBody();
} else {
return false;
}
}
// TODO: this method only exists for access to LoggingAuditTrail in a Node for testing. // TODO: this method only exists for access to LoggingAuditTrail in a Node for testing.
// DO NOT USE IT, IT WILL BE REMOVED IN THE FUTURE // DO NOT USE IT, IT WILL BE REMOVED IN THE FUTURE
public AuditTrail getAuditTrail() { public AuditTrail getAuditTrail() {

View File

@ -27,9 +27,6 @@ public class AuditUtil {
public static String restRequestContent(RestRequest request) { public static String restRequestContent(RestRequest request) {
if (request.hasContent()) { if (request.hasContent()) {
if (request.isStreamedContent()) {
return "Request body had not been received at the time of the audit event";
}
var content = request.content(); var content = request.content();
try { try {
return XContentHelper.convertToJson(content, false, false, request.getXContentType()); return XContentHelper.convertToJson(content, false, false, request.getXContentType());

View File

@ -350,7 +350,7 @@ public class LoggingAuditTrail implements AuditTrail, ClusterStateListener {
final EventFilterPolicyRegistry eventFilterPolicyRegistry; final EventFilterPolicyRegistry eventFilterPolicyRegistry;
// package for testing // package for testing
volatile EnumSet<AuditLevel> events; volatile EnumSet<AuditLevel> events;
boolean includeRequestBody; volatile boolean includeRequestBody;
// fields that all entries have in common // fields that all entries have in common
EntryCommonFields entryCommonFields; EntryCommonFields entryCommonFields;
@ -1072,6 +1072,10 @@ public class LoggingAuditTrail implements AuditTrail, ClusterStateListener {
// not implemented yet // not implemented yet
} }
public boolean includeRequestBody() {
return includeRequestBody;
}
private LogEntryBuilder securityChangeLogEntryBuilder(String requestId) { private LogEntryBuilder securityChangeLogEntryBuilder(String requestId) {
return new LogEntryBuilder(false).with(EVENT_TYPE_FIELD_NAME, SECURITY_CHANGE_ORIGIN_FIELD_VALUE).withRequestId(requestId); return new LogEntryBuilder(false).with(EVENT_TYPE_FIELD_NAME, SECURITY_CHANGE_ORIGIN_FIELD_VALUE).withRequestId(requestId);
} }

View File

@ -22,7 +22,10 @@ import org.elasticsearch.xpack.security.authc.support.SecondaryAuthenticator;
import org.elasticsearch.xpack.security.authz.restriction.WorkflowService; import org.elasticsearch.xpack.security.authz.restriction.WorkflowService;
import org.elasticsearch.xpack.security.operator.OperatorPrivileges; import org.elasticsearch.xpack.security.operator.OperatorPrivileges;
import java.util.function.Consumer;
import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.rest.RestContentAggregator.aggregate;
public class SecurityRestFilter implements RestInterceptor { public class SecurityRestFilter implements RestInterceptor {
@ -70,16 +73,29 @@ public class SecurityRestFilter implements RestInterceptor {
return; return;
} }
final RestRequest wrappedRequest = maybeWrapRestRequest(request, targetHandler); // RestRequest might have stream content, in some cases we need to aggregate request content, for example audit logging.
auditTrailService.get().authenticationSuccess(wrappedRequest); final Consumer<RestRequest> aggregationCallback = (aggregatedRestRequest) -> {
secondaryAuthenticator.authenticateAndAttachToContext(wrappedRequest, ActionListener.wrap(secondaryAuthentication -> { final RestRequest wrappedRequest = maybeWrapRestRequest(aggregatedRestRequest, targetHandler);
if (secondaryAuthentication != null) { auditTrailService.get().authenticationSuccess(wrappedRequest);
logger.trace("Found secondary authentication {} in REST request [{}]", secondaryAuthentication, request.uri()); secondaryAuthenticator.authenticateAndAttachToContext(wrappedRequest, ActionListener.wrap(secondaryAuthentication -> {
} if (secondaryAuthentication != null) {
WorkflowService.resolveWorkflowAndStoreInThreadContext(targetHandler, threadContext); logger.trace(
"Found secondary authentication {} in REST request [{}]",
secondaryAuthentication,
aggregatedRestRequest.uri()
);
}
WorkflowService.resolveWorkflowAndStoreInThreadContext(targetHandler, threadContext);
doHandleRequest(aggregatedRestRequest, channel, targetHandler, listener);
}, e -> handleException(aggregatedRestRequest, e, listener)));
};
if (request.isStreamedContent() && auditTrailService.includeRequestBody()) {
aggregate(request, aggregationCallback::accept);
} else {
aggregationCallback.accept(request);
}
doHandleRequest(request, channel, targetHandler, listener);
}, e -> handleException(request, e, listener)));
} }
private void doHandleRequest(RestRequest request, RestChannel channel, RestHandler targetHandler, ActionListener<Boolean> listener) { private void doHandleRequest(RestRequest request, RestChannel channel, RestHandler targetHandler, ActionListener<Boolean> listener) {