Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into lucene_snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticsearchmachine committed Aug 20, 2024
2 parents d03c197 + 6f3fab9 commit 0f0704e
Show file tree
Hide file tree
Showing 144 changed files with 3,483 additions and 1,000 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/111855.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111855
summary: "ESQL: Profile more timing information"
area: ES|QL
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/111943.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111943
summary: Fix synthetic source for empty nested objects
area: Mapping
type: bug
issues:
- 111811
7 changes: 7 additions & 0 deletions docs/changelog/111955.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 111955
summary: Clean up dangling S3 multipart uploads
area: Snapshot/Restore
type: enhancement
issues:
- 101169
- 44971
6 changes: 6 additions & 0 deletions docs/changelog/111968.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111968
summary: "ESQL: don't lose the original casting error message"
area: ES|QL
type: bug
issues:
- 111967
15 changes: 15 additions & 0 deletions docs/changelog/111972.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
pr: 111972
summary: Introduce global retention in data stream lifecycle.
area: Data streams
type: feature
issues: []
highlight:
title: Add global retention in data stream lifecycle
body: "Data stream lifecycle now supports configuring retention on a cluster level,\
\ namely global retention. Global retention \nallows us to configure two different\
\ retentions:\n\n- `data_streams.lifecycle.retention.default` is applied to all\
\ data streams managed by the data stream lifecycle that do not have retention\n\
defined on the data stream level.\n- `data_streams.lifecycle.retention.max` is\
\ applied to all data streams managed by the data stream lifecycle and it allows\
\ any data stream \ndata to be deleted after the `max_retention` has passed."
notable: true
6 changes: 6 additions & 0 deletions docs/changelog/111983.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111983
summary: Avoid losing error message in failure collector
area: ES|QL
type: bug
issues:
- 111894
6 changes: 6 additions & 0 deletions docs/changelog/112005.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 112005
summary: Check for valid `parentDoc` before retrieving its previous
area: Mapping
type: bug
issues:
- 111990
1 change: 1 addition & 0 deletions docs/reference/api-conventions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ All REST API parameters (both request parameters and JSON body) support
providing boolean "false" as the value `false` and boolean "true" as the
value `true`. All other values will raise an error.

[[api-conventions-number-values]]
[discrete]
=== Number Values

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/reference/esql/functions/kibana/docs/to_datetime.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions docs/reference/release-notes/8.15.0.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ Either downgrade to an earlier version, upgrade to 8.15.1, or else follow the
recommendation in the manual to entirely disable swap instead of using the
memory lock feature (issue: {es-issue}111847[#111847])

* The `took` field of the response to the <<docs-bulk>> API is incorrect and may be rather large. Clients which
<<api-conventions-number-values,incorrectly>> assume that this value will be within a particular range (e.g. that it fits into a 32-bit
signed integer) may encounter errors (issue: {es-issue}111854[#111854])

[[breaking-8.15.0]]
[float]
=== Breaking changes
Expand Down
12 changes: 12 additions & 0 deletions docs/reference/settings/data-stream-lifecycle-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ These are the settings available for configuring <<data-stream-lifecycle, data s

==== Cluster level settings

[[data-streams-lifecycle-retention-max]]
`data_streams.lifecycle.retention.max`::
(<<dynamic-cluster-setting,Dynamic>>, <<time-units, time unit value>>)
The maximum retention period that will apply to all user data streams managed by the data stream lifecycle. The max retention will also
override the retention of a data stream whose configured retention exceeds the max retention. It should be greater than `10s`.

[[data-streams-lifecycle-retention-default]]
`data_streams.lifecycle.retention.default`::
(<<dynamic-cluster-setting,Dynamic>>, <<time-units, time unit value>>)
The retention period that will apply to all user data streams managed by the data stream lifecycle that do not have retention configured.
It should be greater than `10s` and less or equals than <<data-streams-lifecycle-retention-max, `data_streams.lifecycle.retention.max`>>.

[[data-streams-lifecycle-poll-interval]]
`data_streams.lifecycle.poll_interval`::
(<<dynamic-cluster-setting,Dynamic>>, <<time-units, time unit value>>)
Expand Down
36 changes: 9 additions & 27 deletions docs/reference/snapshot-restore/repository-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,15 @@ include::repository-shared-settings.asciidoc[]
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html[AWS
DeleteObjects API].

`max_multipart_upload_cleanup_size`::

(<<number,numeric>>) Sets the maximum number of possibly-dangling multipart
uploads to clean up in each batch of snapshot deletions. Defaults to `1000`
which is the maximum number supported by the
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[AWS
ListMultipartUploads API]. If set to `0`, {es} will not attempt to clean up
dangling multipart uploads.

NOTE: The option of defining client settings in the repository settings as
documented below is considered deprecated, and will be removed in a future
version.
Expand Down Expand Up @@ -492,33 +501,6 @@ by the `elasticsearch` user. By default, {es} runs as user `elasticsearch` using

If the symlink exists, it will be used by default by all S3 repositories that don't have explicit `client` credentials.

==== Cleaning up multi-part uploads

{es} uses S3's multi-part upload process to upload larger blobs to the
repository. The multi-part upload process works by dividing each blob into
smaller parts, uploading each part independently, and then completing the
upload in a separate step. This reduces the amount of data that {es} must
re-send if an upload fails: {es} only needs to re-send the part that failed
rather than starting from the beginning of the whole blob. The storage for each
part is charged independently starting from the time at which the part was
uploaded.

If a multi-part upload cannot be completed then it must be aborted in order to
delete any parts that were successfully uploaded, preventing further storage
charges from accumulating. {es} will automatically abort a multi-part upload on
failure, but sometimes the abort request itself fails. For example, if the
repository becomes inaccessible or the instance on which {es} is running is
terminated abruptly then {es} cannot complete or abort any ongoing uploads.

You must make sure that failed uploads are eventually aborted to avoid
unnecessary storage costs. You can use the
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[List
multipart uploads API] to list the ongoing uploads and look for any which are
unusually long-running, or you can
https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html[configure
a bucket lifecycle policy] to automatically abort incomplete uploads once they
reach a certain age.

[[repository-s3-aws-vpc]]
==== AWS VPC bandwidth settings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.sun.jna.Pointer;
import com.sun.jna.Structure;

import org.elasticsearch.nativeaccess.CloseableByteBuffer;
import org.elasticsearch.nativeaccess.lib.PosixCLibrary;

import java.util.Arrays;
Expand Down Expand Up @@ -109,6 +110,16 @@ public long bytesalloc() {
}
}

public static class JnaSockAddr implements SockAddr {
final Memory memory;

JnaSockAddr(String path) {
this.memory = new Memory(110);
memory.setShort(0, AF_UNIX);
memory.setString(2, path, "UTF-8");
}
}

private interface NativeFunctions extends Library {
int geteuid();

Expand All @@ -126,6 +137,12 @@ private interface NativeFunctions extends Library {

int close(int fd);

int socket(int domain, int type, int protocol);

int connect(int sockfd, Pointer addr, int addrlen);

long send(int sockfd, Pointer buf, long buflen, int flags);

String strerror(int errno);
}

Expand Down Expand Up @@ -235,6 +252,30 @@ public int fstat64(int fd, Stat64 stats) {
return fstat64.fstat64(fd, jnaStats.memory);
}

@Override
public int socket(int domain, int type, int protocol) {
return functions.socket(domain, type, protocol);
}

@Override
public SockAddr newUnixSockAddr(String path) {
return new JnaSockAddr(path);
}

@Override
public int connect(int sockfd, SockAddr addr) {
assert addr instanceof JnaSockAddr;
var jnaAddr = (JnaSockAddr) addr;
return functions.connect(sockfd, jnaAddr.memory, (int) jnaAddr.memory.size());
}

@Override
public long send(int sockfd, CloseableByteBuffer buffer, int flags) {
assert buffer instanceof JnaCloseableByteBuffer;
var nativeBuffer = (JnaCloseableByteBuffer) buffer;
return functions.send(sockfd, nativeBuffer.memory, nativeBuffer.buffer().remaining(), flags);
}

@Override
public String strerror(int errno) {
return functions.strerror(errno);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.nativeaccess.lib.LinuxCLibrary.SockFProg;
import org.elasticsearch.nativeaccess.lib.LinuxCLibrary.SockFilter;
import org.elasticsearch.nativeaccess.lib.NativeLibraryProvider;
import org.elasticsearch.nativeaccess.lib.SystemdLibrary;
import org.elasticsearch.nativeaccess.lib.PosixCLibrary;

import java.util.Map;

Expand Down Expand Up @@ -92,7 +92,14 @@ record Arch(
LinuxNativeAccess(NativeLibraryProvider libraryProvider) {
super("Linux", libraryProvider, new PosixConstants(-1L, 9, 1, 8, 64, 144, 48, 64));
this.linuxLibc = libraryProvider.getLibrary(LinuxCLibrary.class);
this.systemd = new Systemd(libraryProvider.getLibrary(SystemdLibrary.class));
String socketPath = System.getenv("NOTIFY_SOCKET");
if (socketPath == null) {
this.systemd = null; // not running under systemd
} else {
logger.debug("Systemd socket path: {}", socketPath);
var buffer = newBuffer(64);
this.systemd = new Systemd(libraryProvider.getLibrary(PosixCLibrary.class), socketPath, buffer);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,28 @@

import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.nativeaccess.lib.SystemdLibrary;
import org.elasticsearch.nativeaccess.lib.PosixCLibrary;

import java.util.Locale;
import java.nio.charset.StandardCharsets;

/**
* Wraps access to notifications to systemd.
* <p>
* Systemd notifications are done through a Unix socket. Although Java does support
* opening unix sockets, it unfortunately does not support datagram sockets. This class
* instead opens and communicates with the socket using native methods.
*/
public class Systemd {
private static final Logger logger = LogManager.getLogger(Systemd.class);

private final SystemdLibrary lib;
private final PosixCLibrary libc;
private final String socketPath;
private final CloseableByteBuffer buffer;

Systemd(SystemdLibrary lib) {
this.lib = lib;
Systemd(PosixCLibrary libc, String socketPath, CloseableByteBuffer buffer) {
this.libc = libc;
this.socketPath = socketPath;
this.buffer = buffer;
}

/**
Expand All @@ -41,15 +52,61 @@ public void notify_stopping() {
}

private void notify(String state, boolean warnOnError) {
int rc = lib.sd_notify(0, state);
logger.trace("sd_notify({}, {}) returned [{}]", 0, state, rc);
if (rc < 0) {
String message = String.format(Locale.ROOT, "sd_notify(%d, %s) returned error [%d]", 0, state, rc);
if (warnOnError) {
logger.warn(message);
int sockfd = libc.socket(PosixCLibrary.AF_UNIX, PosixCLibrary.SOCK_DGRAM, 0);
if (sockfd < 0) {
throwOrLog("Could not open systemd socket: " + libc.strerror(libc.errno()), warnOnError);
return;
}
RuntimeException error = null;
try {
var sockAddr = libc.newUnixSockAddr(socketPath);
if (libc.connect(sockfd, sockAddr) != 0) {
throwOrLog("Could not connect to systemd socket: " + libc.strerror(libc.errno()), warnOnError);
return;
}

byte[] bytes = state.getBytes(StandardCharsets.US_ASCII);
final long bytesSent;
synchronized (buffer) {
buffer.buffer().clear();
buffer.buffer().put(0, bytes);
buffer.buffer().limit(bytes.length);
bytesSent = libc.send(sockfd, buffer, 0);
}

if (bytesSent == -1) {
throwOrLog("Failed to send message (" + state + ") to systemd socket: " + libc.strerror(libc.errno()), warnOnError);
} else if (bytesSent != bytes.length) {
throwOrLog("Not all bytes of message (" + state + ") sent to systemd socket (sent " + bytesSent + ")", warnOnError);
} else {
throw new RuntimeException(message);
logger.trace("Message (" + state + ") sent to systemd");
}
} catch (RuntimeException e) {
error = e;
} finally {
if (libc.close(sockfd) != 0) {
try {
throwOrLog("Could not close systemd socket: " + libc.strerror(libc.errno()), warnOnError);
} catch (RuntimeException e) {
if (error != null) {
error.addSuppressed(e);
throw error;
} else {
throw e;
}
}
} else if (error != null) {
throw error;
}
}
}

private void throwOrLog(String message, boolean warnOnError) {
if (warnOnError) {
logger.warn(message);
} else {
logger.error(message);
throw new RuntimeException(message);
}
}
}
Loading

0 comments on commit 0f0704e

Please sign in to comment.