diff --git a/modules/elasticsearch/elasticsearch.go b/modules/elasticsearch/elasticsearch.go index 5fab503b36..e878c3e6ef 100644 --- a/modules/elasticsearch/elasticsearch.go +++ b/modules/elasticsearch/elasticsearch.go @@ -2,6 +2,8 @@ package elasticsearch import ( "context" + "crypto/tls" + "crypto/x509" "fmt" "io" "os" @@ -15,6 +17,7 @@ const ( defaultTCPPort = "9300" defaultPassword = "changeme" defaultUsername = "elastic" + defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt" minimalImageVersion = "7.9.2" ) @@ -32,7 +35,7 @@ type ElasticsearchContainer struct { } // Deprecated: use Run instead -// RunContainer creates an instance of the Couchbase container type +// RunContainer creates an instance of the Elasticsearch container type func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*ElasticsearchContainer, error) { return Run(ctx, "docker.elastic.co/elasticsearch/elasticsearch:7.9.2", opts...) } @@ -50,54 +53,41 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom defaultHTTPPort + "/tcp", defaultTCPPort + "/tcp", }, - // regex that - // matches 8.3 JSON logging with started message and some follow up content within the message field - // matches 8.0 JSON logging with no whitespace between message field and content - // matches 7.x JSON logging with whitespace between message field and content - // matches 6.x text logging with node name in brackets and just a 'started' message till the end of the line - WaitingFor: wait.ForLog(`.*("message":\s?"started(\s|")?.*|]\sstarted\n)`).AsRegexp(), - LifecycleHooks: []testcontainers.ContainerLifecycleHooks{ - { - // the container needs a post create hook to set the default JVM options in a file - PostCreates: []testcontainers.ContainerHook{}, - PostReadies: []testcontainers.ContainerHook{}, - }, - }, }, Started: true, } // Gather all config options (defaults and then apply provided options) - settings := defaultOptions() + options := defaultOptions() for _, opt := range opts { if apply, ok := opt.(Option); ok { - apply(settings) + apply(options) } if err := opt.Customize(&req); err != nil { return nil, err } } - // Transfer the certificate settings to the container request - err := configureCertificate(settings, &req) - if err != nil { - return nil, err - } - // Transfer the password settings to the container request - err = configurePassword(settings, &req) - if err != nil { + if err := configurePassword(options, &req); err != nil { return nil, err } if isAtLeastVersion(req.Image, 7) { - req.LifecycleHooks[0].PostCreates = append(req.LifecycleHooks[0].PostCreates, configureJvmOpts) + req.LifecycleHooks = append(req.LifecycleHooks, + testcontainers.ContainerLifecycleHooks{ + PostCreates: []testcontainers.ContainerHook{configureJvmOpts}, + }, + ) } + // Set the default waiting strategy if not already set. + setWaitFor(options, &req.ContainerRequest) + container, err := testcontainers.GenericContainer(ctx, req) var esContainer *ElasticsearchContainer if container != nil { - esContainer = &ElasticsearchContainer{Container: container, Settings: *settings} + esContainer = &ElasticsearchContainer{Container: container, Settings: *options} } if err != nil { return esContainer, fmt.Errorf("generic container: %w", err) @@ -110,6 +100,61 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom return esContainer, nil } +// certWriter is a helper that writes the details of a CA cert to options. +type certWriter struct { + options *Options + certPool *x509.CertPool +} + +// Read reads the CA cert from the reader and appends it to the options. +func (w *certWriter) Read(r io.Reader) error { + buf, err := io.ReadAll(r) + if err != nil { + return fmt.Errorf("read CA cert: %w", err) + } + + w.options.CACert = buf + w.certPool.AppendCertsFromPEM(w.options.CACert) + + return nil +} + +// setWaitFor sets the req.WaitingFor strategy based on settings. +func setWaitFor(options *Options, req *testcontainers.ContainerRequest) { + var strategies []wait.Strategy + if req.WaitingFor != nil { + // Custom waiting strategy, ensure we honour it. + strategies = append(strategies, req.WaitingFor) + } + + waitHTTP := wait.ForHTTP("/").WithPort(defaultHTTPPort) + if sslRequired(req) { + waitHTTP = waitHTTP.WithTLS(true).WithAllowInsecure(true) + cw := &certWriter{ + options: options, + certPool: x509.NewCertPool(), + } + + waitHTTP = waitHTTP. + WithTLS(true, &tls.Config{RootCAs: cw.certPool}) + + strategies = append(strategies, wait.ForFile(defaultCaCertPath).WithMatcher(cw.Read)) + } + + if options.Password != "" || options.Username != "" { + waitHTTP = waitHTTP.WithBasicAuth(options.Username, options.Password) + } + + strategies = append(strategies, waitHTTP) + + if len(strategies) > 1 { + req.WaitingFor = wait.ForAll(strategies...) + return + } + + req.WaitingFor = strategies[0] +} + // configureAddress sets the address of the Elasticsearch container. // If the certificate is set, it will use https as protocol, otherwise http. func (c *ElasticsearchContainer) configureAddress(ctx context.Context) error { @@ -133,50 +178,28 @@ func (c *ElasticsearchContainer) configureAddress(ctx context.Context) error { return nil } -// configureCertificate transfers the certificate settings to the container request. -// For that, it defines a post start hook that copies the certificate from the container to the host. -// The certificate is only available since version 8, and will be located in a well-known location. -func configureCertificate(settings *Options, req *testcontainers.GenericContainerRequest) error { - if isAtLeastVersion(req.Image, 8) { - // These configuration keys explicitly disable CA generation. - // If any are set we skip the file retrieval. - configKeys := []string{ - "xpack.security.enabled", - "xpack.security.http.ssl.enabled", - "xpack.security.transport.ssl.enabled", - } - for _, configKey := range configKeys { - if value, ok := req.Env[configKey]; ok { - if value == "false" { - return nil - } +// sslRequired returns true if the SSL is required, otherwise false. +func sslRequired(req *testcontainers.ContainerRequest) bool { + if !isAtLeastVersion(req.Image, 8) { + return false + } + + // These configuration keys explicitly disable CA generation. + // If any are set we skip the file retrieval. + configKeys := []string{ + "xpack.security.enabled", + "xpack.security.http.ssl.enabled", + "xpack.security.transport.ssl.enabled", + } + for _, configKey := range configKeys { + if value, ok := req.Env[configKey]; ok { + if value == "false" { + return false } } - - // The container needs a post ready hook to copy the certificate from the container to the host. - // This certificate is only available since version 8 - req.LifecycleHooks[0].PostReadies = append(req.LifecycleHooks[0].PostReadies, - func(ctx context.Context, container testcontainers.Container) error { - const defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt" - - readCloser, err := container.CopyFileFromContainer(ctx, defaultCaCertPath) - if err != nil { - return err - } - - // receive the bytes from the default location - certBytes, err := io.ReadAll(readCloser) - if err != nil { - return err - } - - settings.CACert = certBytes - - return nil - }) } - return nil + return true } // configurePassword transfers the password settings to the container request. diff --git a/modules/elasticsearch/options.go b/modules/elasticsearch/options.go index ed801c3b09..ba4dca75c3 100644 --- a/modules/elasticsearch/options.go +++ b/modules/elasticsearch/options.go @@ -16,7 +16,6 @@ type Options struct { func defaultOptions() *Options { return &Options{ - CACert: nil, Username: defaultUsername, } }