Skip to content

Commit

Permalink
fix(elasticsearch): wait for
Browse files Browse the repository at this point in the history
Wait for the HTTP port to be available to prevent random failures when
the container isn't fully started and returns 503 errors.
  • Loading branch information
stevenh committed Sep 10, 2024
1 parent b4f8294 commit 0753e11
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 67 deletions.
159 changes: 93 additions & 66 deletions modules/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package elasticsearch

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"os"
Expand All @@ -15,6 +17,7 @@ const (
defaultTCPPort = "9300"
defaultPassword = "changeme"
defaultUsername = "elastic"
defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt"
minimalImageVersion = "7.9.2"
)

Expand All @@ -32,13 +35,14 @@ type ElasticsearchContainer struct {
}

// Deprecated: use Run instead
// RunContainer creates an instance of the Couchbase container type
// RunContainer creates an instance of the Elasticsearch container type
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*ElasticsearchContainer, error) {
return Run(ctx, "docker.elastic.co/elasticsearch/elasticsearch:7.9.2", opts...)
}

// Run creates an instance of the Elasticsearch container type
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*ElasticsearchContainer, error) {

Check failure on line 45 in modules/elasticsearch/elasticsearch.go

View workflow job for this annotation

GitHub Actions / test-modules (1.22.x, ubuntu-latest, elasticsearch) / modules/elasticsearch/ubuntu-latest/1.22.x

File is not `gofumpt`-ed (gofumpt)

Check failure on line 45 in modules/elasticsearch/elasticsearch.go

View workflow job for this annotation

GitHub Actions / test-modules (1.x, ubuntu-latest, elasticsearch) / modules/elasticsearch/ubuntu-latest/1.x

File is not `gofumpt`-ed (gofumpt)
req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: img,
Expand All @@ -50,56 +54,43 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
defaultHTTPPort + "/tcp",
defaultTCPPort + "/tcp",
},
// regex that
// matches 8.3 JSON logging with started message and some follow up content within the message field
// matches 8.0 JSON logging with no whitespace between message field and content
// matches 7.x JSON logging with whitespace between message field and content
// matches 6.x text logging with node name in brackets and just a 'started' message till the end of the line
WaitingFor: wait.ForLog(`.*("message":\s?"started(\s|")?.*|]\sstarted\n)`).AsRegexp(),
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{
{
// the container needs a post create hook to set the default JVM options in a file
PostCreates: []testcontainers.ContainerHook{},
PostReadies: []testcontainers.ContainerHook{},
},
},
},
Started: true,
}

// Gather all config options (defaults and then apply provided options)
settings := defaultOptions()
options := defaultOptions()
for _, opt := range opts {
if apply, ok := opt.(Option); ok {
apply(settings)
apply(options)
}
if err := opt.Customize(&req); err != nil {
return nil, err
}
}

// Transfer the certificate settings to the container request
err := configureCertificate(settings, &req)
if err != nil {
return nil, err
}

// Transfer the password settings to the container request
err = configurePassword(settings, &req)
if err != nil {
if err := configurePassword(options, &req); err != nil {
return nil, err
}

if isAtLeastVersion(req.Image, 7) {
req.LifecycleHooks[0].PostCreates = append(req.LifecycleHooks[0].PostCreates, configureJvmOpts)
req.LifecycleHooks = append(req.LifecycleHooks,
testcontainers.ContainerLifecycleHooks{
PostCreates: []testcontainers.ContainerHook{configureJvmOpts},
},
)
}

// Set the default waiting strategy if not already set.
setWaitFor(options, &req.ContainerRequest)

container, err := testcontainers.GenericContainer(ctx, req)
if err != nil {
return nil, err
}

esContainer := &ElasticsearchContainer{Container: container, Settings: *settings}
esContainer := &ElasticsearchContainer{Container: container, Settings: *options}

address, err := configureAddress(ctx, esContainer)
if err != nil {
Expand All @@ -111,6 +102,64 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
return esContainer, nil
}

// certWriter is a helper that writes the details of a CA cert to options.
type certWriter struct {
options *Options
certPool *x509.CertPool
}

// Read reads the CA cert from the reader and appends it to the options.
func (w *certWriter) Read(r io.Reader) error {
buf, err := io.ReadAll(r)
if err != nil {
return fmt.Errorf("read CA cert: %w", err)
}

w.options.CACert = buf
w.certPool.AppendCertsFromPEM(w.options.CACert)

return nil
}

// setWaitFor sets the req.WaitingFor strategy based on settings.
func setWaitFor(options *Options, req *testcontainers.ContainerRequest) {
var strategies []wait.Strategy
if req.WaitingFor != nil {
// Custom waiting strategy, ensure we honour it.
strategies = append(strategies, req.WaitingFor)
}

waitHTTP := wait.ForHTTP("/").WithPort(defaultHTTPPort)
if sslRequired(req) {
waitHTTP = waitHTTP.WithTLS(true).WithAllowInsecure(true)
cw := &certWriter{
options: options,
certPool: x509.NewCertPool(),
}

waitHTTP := wait.ForHTTP("/").
WithPort(defaultHTTPPort).
WithTLS(true, &tls.Config{RootCAs: cw.certPool})
if options.Password != "" || options.Username != "" {
waitHTTP = waitHTTP.WithBasicAuth(options.Username, options.Password)

Check failure on line 144 in modules/elasticsearch/elasticsearch.go

View workflow job for this annotation

GitHub Actions / test-modules (1.22.x, ubuntu-latest, elasticsearch) / modules/elasticsearch/ubuntu-latest/1.22.x

SA4006: this value of `waitHTTP` is never used (staticcheck)

Check failure on line 144 in modules/elasticsearch/elasticsearch.go

View workflow job for this annotation

GitHub Actions / test-modules (1.x, ubuntu-latest, elasticsearch) / modules/elasticsearch/ubuntu-latest/1.x

SA4006: this value of `waitHTTP` is never used (staticcheck)
}
strategies = append(strategies, wait.ForFile(defaultCaCertPath).WithMatcher(cw.Read))
}

if options.Password != "" || options.Username != "" {
waitHTTP = waitHTTP.WithBasicAuth(options.Username, options.Password)
}

strategies = append(strategies, waitHTTP)

if len(strategies) > 1 {
req.WaitingFor = wait.ForAll(strategies...)
return
}

req.WaitingFor = strategies[0]
}

// configureAddress sets the address of the Elasticsearch container.
// If the certificate is set, it will use https as protocol, otherwise http.
func configureAddress(ctx context.Context, c *ElasticsearchContainer) (string, error) {
Expand All @@ -132,50 +181,28 @@ func configureAddress(ctx context.Context, c *ElasticsearchContainer) (string, e
return fmt.Sprintf("%s://%s:%s", proto, host, containerPort.Port()), nil
}

// configureCertificate transfers the certificate settings to the container request.
// For that, it defines a post start hook that copies the certificate from the container to the host.
// The certificate is only available since version 8, and will be located in a well-known location.
func configureCertificate(settings *Options, req *testcontainers.GenericContainerRequest) error {
if isAtLeastVersion(req.Image, 8) {
// These configuration keys explicitly disable CA generation.
// If any are set we skip the file retrieval.
configKeys := []string{
"xpack.security.enabled",
"xpack.security.http.ssl.enabled",
"xpack.security.transport.ssl.enabled",
}
for _, configKey := range configKeys {
if value, ok := req.Env[configKey]; ok {
if value == "false" {
return nil
}
// sslRequired returns true if the SSL is required, otherwise false.
func sslRequired(req *testcontainers.ContainerRequest) bool {
if !isAtLeastVersion(req.Image, 8) {
return false
}

// These configuration keys explicitly disable CA generation.
// If any are set we skip the file retrieval.
configKeys := []string{
"xpack.security.enabled",
"xpack.security.http.ssl.enabled",
"xpack.security.transport.ssl.enabled",
}
for _, configKey := range configKeys {
if value, ok := req.Env[configKey]; ok {
if value == "false" {
return false
}
}

// The container needs a post ready hook to copy the certificate from the container to the host.
// This certificate is only available since version 8
req.LifecycleHooks[0].PostReadies = append(req.LifecycleHooks[0].PostReadies,
func(ctx context.Context, container testcontainers.Container) error {
const defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt"

readCloser, err := container.CopyFileFromContainer(ctx, defaultCaCertPath)
if err != nil {
return err
}

// receive the bytes from the default location
certBytes, err := io.ReadAll(readCloser)
if err != nil {
return err
}

settings.CACert = certBytes

return nil
})
}

return nil
return true
}

// configurePassword transfers the password settings to the container request.
Expand Down
1 change: 0 additions & 1 deletion modules/elasticsearch/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Options struct {

func defaultOptions() *Options {
return &Options{
CACert: nil,
Username: defaultUsername,
}
}
Expand Down

0 comments on commit 0753e11

Please sign in to comment.