Skip to content

Commit

Permalink
Basic kerberos support
Browse files Browse the repository at this point in the history
This adds mutual Kerberos authentication using a SASL handshake with the
namenode. A kerberos client must be constructed manually and passed to
hdfs.NewClient to enable support; the command-line client is currently only
configurable with a ccache file and hadoop.security.authentication set in the
hadoop configuration.
  • Loading branch information
Shastick authored and colinmarc committed Jul 20, 2018
1 parent ca25fc2 commit f4b29de
Show file tree
Hide file tree
Showing 129 changed files with 15,010 additions and 77 deletions.
77 changes: 76 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@
[prune]
go-tests = true
unused-packages = true

[[constraint]]
name = "gopkg.in/jcmturner/gokrb5.v5"
version = "5.2.0"
66 changes: 57 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hdfs

import (
"errors"
"io"
"io/ioutil"
"os"
Expand All @@ -9,6 +10,7 @@ import (

hdfs "github.com/colinmarc/hdfs/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/rpc"
krb "gopkg.in/jcmturner/gokrb5.v5/client"
)

// A Client represents a connection to an HDFS cluster
Expand All @@ -27,15 +29,47 @@ type ClientOptions struct {
// is useful if you needed to create the namenode net.Conn manually for
// whatever reason.
Namenode *rpc.NamenodeConnection
// KerberosEnabled specifies whether the client should refuse to connect to
// namenodes not supporting Kerberos. If set to true, then KerberosClient and
// KerberosServiceName should be provided.
KerberosEnabled bool
// KerberosClient is used to connect to kerberized HDFS clusters if
// KerberosEnabled is set to true (in which case it is required).
KerberosClient krb.Client
// KerberosServiceName specifies the <SERVICE> part of the Service Principle
// Name (<SERVICE>/<FQDN>) for the namenode(s). It is required if
// KerberosEnabled is set to true.
KerberosServiceName string
}

// ClientOptionsFromConf attempts to load any relevant configuration options
// from the given Hadoop configuration and create a ClientOptions struct
// suitable for creating a Client. Currently this is restricted to the namenode
// address(es), but may be expanded in the future.
// suitable for creating a Client. Currently this sets the following fields
// on ClientOptions:
// // Determined by fs.defaultFS (or the deprecated fs.default.name), or
// // fields beginning with dfs.namenode.rpc-address.
// Addresses []string
// // Determined to be true if the value for hadoop.security.authentication is
// // equal to 'kerberos'.
// KerberosEnabled bool
// // Determined by the first half of dfs.namenode.kerberos.principal.
// KerberosServiceName string
func ClientOptionsFromConf(conf HadoopConf) (ClientOptions, error) {
namenodes, err := conf.Namenodes()
return ClientOptions{Addresses: namenodes}, err
if err != nil {
return ClientOptions{}, nil
}

options := ClientOptions{Addresses: namenodes}
if strings.ToLower(conf["hadoop.security.authentication"]) == "kerberos" {
options.KerberosEnabled = true
}

if conf["dfs.namenode.kerberos.principal"] != "" {
options.KerberosServiceName = strings.Split(conf["dfs.namenode.kerberos.principal"], "/")[0]
}

return options, err
}

// Username returns the value of HADOOP_USER_NAME in the environment, or
Expand All @@ -58,12 +92,25 @@ func Username() (string, error) {
// the client could not be created.
func NewClient(options ClientOptions) (*Client, error) {
var err error

if options.Namenode == nil {
if options.User == "" {
return nil, errors.New("user not specified")
}

if options.KerberosEnabled && options.KerberosClient.Credentials == nil {
return nil, errors.New("kerberos enabled, but no kerberos client provided")
}

if options.KerberosEnabled && options.KerberosServiceName == "" {
return nil, errors.New("kerberos enabled, but no kerberos service name provided")
}

options.Namenode, err = rpc.NewNamenodeConnectionWithOptions(
rpc.NamenodeConnectionOptions{
Addresses: options.Addresses,
User: options.User,
Addresses: options.Addresses,
User: options.User,
KerberosClient: options.KerberosClient,
KerberosServiceName: options.KerberosServiceName,
},
)
if err != nil {
Expand All @@ -76,9 +123,10 @@ func NewClient(options ClientOptions) (*Client, error) {

// New returns a connected Client, or an error if it can't connect. The user
// will be the current system user, or HADOOP_USER_NAME if set. Any relevant
// options (including the address(es) of the namenode(s), if an empty string is
// passed) will be loaded from the Hadoop configuration present at
// HADOOP_CONF_DIR or the default location.
// options (including the namenode address(es), if an empty string is passed)
// will be loaded from the Hadoop configuration present at HADOOP_CONF_DIR or
// the default location. Note, however, that New will not attempt any Kerberos
// authentication; use NewClient if you need that.
func New(address string) (*Client, error) {
conf := LoadHadoopConf("")
options, err := ClientOptionsFromConf(conf)
Expand Down
2 changes: 2 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ func TestNewWithMultipleNodes(t *testing.T) {
}
_, err := NewClient(ClientOptions{
Addresses: []string{"localhost:80", nn},
User: "user",
})

assert.Nil(t, err)
}

Expand Down
60 changes: 60 additions & 0 deletions cmd/hdfs/kerberos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"fmt"
"os"
"os/user"
"strings"

krb "gopkg.in/jcmturner/gokrb5.v5/client"
"gopkg.in/jcmturner/gokrb5.v5/config"
"gopkg.in/jcmturner/gokrb5.v5/credentials"
)

// TODO: Write a kerberos_windows.go and move this to kerberos_unix.go. This
// assumes MIT kerberos on unix.

func getKerberosClient() (krb.Client, error) {
var client krb.Client

configPath := os.Getenv("KRB5_CONFIG")
if configPath == "" {
configPath = "/etc/krb5/krb5.conf"
}

cfg, err := config.Load(configPath)
if err != nil {
return client, err
}

// Determine the ccache location from the environment, falling back to the
// default location.
ccachePath := os.Getenv("KRB5CCNAME")
if strings.Contains(ccachePath, ":") {
if strings.HasPrefix(ccachePath, "FILE:") {
ccachePath = strings.SplitN(ccachePath, ":", 2)[1]
} else {
return client, fmt.Errorf("can't use ccache: %s", ccachePath)
}
} else if ccachePath == "" {
u, err := user.Current()
if err != nil {
return client, nil
}

ccachePath = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid)
}

ccache, err := credentials.LoadCCache(ccachePath)
if err != nil {
return client, err
}

client, err = krb.NewClientFromCCache(ccache)
if err != nil {
return client, err
}

client.WithConfig(cfg)
return client, nil
}
44 changes: 35 additions & 9 deletions cmd/hdfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ Valid commands:
dfOpts = getopt.New()
dfh = dfOpts.Bool('h')

cachedClient *hdfs.Client
status = 0
cachedClients map[string]*hdfs.Client
status = 0
)

func init() {
Expand All @@ -89,6 +89,8 @@ func init() {
duOpts.SetUsage(printHelp)
getmergeOpts.SetUsage(printHelp)
dfOpts.SetUsage(printHelp)

cachedClients = make(map[string]*hdfs.Client)
}

func main() {
Expand Down Expand Up @@ -170,23 +172,47 @@ func fatalWithUsage(msg ...interface{}) {
}

func getClient(namenode string) (*hdfs.Client, error) {
if cachedClient != nil {
return cachedClient, nil
if cachedClients[namenode] != nil {
return cachedClients[namenode], nil
}

if namenode == "" {
namenode = os.Getenv("HADOOP_NAMENODE")
}

if namenode == "" && os.Getenv("HADOOP_CONF_DIR") == "" {
// Ignore errors here, since we don't care if the conf doesn't exist if the
// namenode was specified.
conf := hdfs.LoadHadoopConf("")
options, _ := hdfs.ClientOptionsFromConf(conf)
if namenode != "" {
options.Addresses = []string{namenode}
}

if options.Addresses == nil {
return nil, errors.New("Couldn't find a namenode to connect to. You should specify hdfs://<namenode>:<port> in your paths. Alternatively, set HADOOP_NAMENODE or HADOOP_CONF_DIR in your environment.")
}

c, err := hdfs.New(namenode)
var err error
if options.KerberosEnabled {
options.KerberosClient, err = getKerberosClient()
if err != nil {
return nil, fmt.Errorf("Problem with kerberos authentication: %s", err)
}

creds := options.KerberosClient.Credentials
options.User = creds.Username + "@" + creds.Realm
} else {
options.User, err = hdfs.Username()
if err != nil {
return nil, fmt.Errorf("Couldn't determine user: %s", err)
}
}

c, err := hdfs.NewClient(options)
if err != nil {
return nil, err
return nil, fmt.Errorf("Couldn't connect to namenode: %s", err)
}

cachedClient = c
return cachedClient, nil
cachedClients[namenode] = c
return c, nil
}
9 changes: 5 additions & 4 deletions conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ type propertyList struct {
// pairs found in a user's hadoop configuration files.
type HadoopConf map[string]string

var errUnresolvedNamenode = errors.New("no namenode address in configuration")
var errNoNamenodesInConf = errors.New("no namenode address(es) in configuration")

// LoadHadoopConf returns a HadoopConf object representing configuration from
// the specified path, or finds the correct path in the environment. If
// path or the env variable HADOOP_CONF_DIR is specified, it should point
// directly to the directory where the xml files are. If neither is specified,
// ${HADOOP_HOME}/conf will be used.
func LoadHadoopConf(path string) HadoopConf {

if path == "" {
path = os.Getenv("HADOOP_CONF_DIR")
if path == "" {
Expand Down Expand Up @@ -64,7 +63,9 @@ func LoadHadoopConf(path string) HadoopConf {
}

// Namenodes returns the namenode hosts present in the configuration. The
// returned slice will be sorted and deduped.
// returned slice will be sorted and deduped. The values are loaded from
// fs.defaultFS (or the deprecated fs.default.name), or fields beginning with
// dfs.namenode.rpc-address.
func (conf HadoopConf) Namenodes() ([]string, error) {
nns := make(map[string]bool)
for key, value := range conf {
Expand All @@ -77,7 +78,7 @@ func (conf HadoopConf) Namenodes() ([]string, error) {
}

if len(nns) == 0 {
return nil, errUnresolvedNamenode
return nil, errNoNamenodesInConf
}

keys := make([]string, 0, len(nns))
Expand Down
Loading

0 comments on commit f4b29de

Please sign in to comment.