Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fingerprint ingest processor #68415

Merged
merged 6 commits into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ingest;

import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;

public final class FingerprintProcessor extends AbstractProcessor {
danhermann marked this conversation as resolved.
Show resolved Hide resolved

public static final String TYPE = "fingerprint";

static final byte[] DELIMITER = new byte[] { 0 };
static final byte[] TRUE_BYTES = new byte[] { 1 };
static final byte[] FALSE_BYTES = new byte[] { 2 };

private final List<String> fields;
private final String targetField;
private final ThreadLocal<Hasher> threadLocalHasher;
private final byte[] salt;
private final boolean ignoreMissing;

FingerprintProcessor(
String tag,
String description,
List<String> fields,
String targetField,
byte[] salt,
ThreadLocal<Hasher> threadLocalHasher,
boolean ignoreMissing
) {
super(tag, description);
this.fields = new ArrayList<>(fields);
this.fields.sort(Comparator.naturalOrder());
this.targetField = targetField;
this.threadLocalHasher = threadLocalHasher;
this.salt = salt;
this.ignoreMissing = ignoreMissing;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Hasher hasher = threadLocalHasher.get();
hasher.reset();
hasher.update(salt);

var values = new Stack<>();
for (int k = fields.size() - 1; k >= 0; k--) {
String field = fields.get(k);
Object value = ingestDocument.getFieldValue(field, Object.class, true);
if (value == null) {
if (ignoreMissing) {
continue;
} else {
throw new IllegalArgumentException("missing field [" + field + "] when calculating fingerprint");
}
}
values.push(value);
}

if (values.size() > 0) {
// iteratively traverse document fields
while (values.isEmpty() == false) {
var value = values.pop();
if (value instanceof List) {
var list = (List<?>) value;
for (int k = list.size() - 1; k >= 0; k--) {
values.push(list.get(k));
}
} else if (value instanceof Set) {
var set = (Set<?>) value;
// process set entries in consistent order
var setList = set.stream().sorted().collect(Collectors.toList());
for (int k = setList.size() - 1; k >= 0; k--) {
values.push(setList.get(k));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streams are super great, but they aren't quite as fast compared to foreach clauses yet, and since this is performance sensitive code, what do you think about doing:

Suggested change
var setList = set.stream().sorted().collect(Collectors.toList());
for (int k = setList.size() - 1; k >= 0; k--) {
values.push(setList.get(k));
}
var setList = new ArrayList<?>(set);
setList.sort(Comparator.naturalOrder());
for (Object thing : setList) {
values.push(thing);
}

?

(I might be missing something about why you're using a for loop indexed rather than foreach, as I think the foreach compiles to the same bytecode, but I could totally be wrong)

Copy link
Contributor Author

@danhermann danhermann Feb 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The for loop iterates backward over the sorted list to push the values onto the stack. That means that the values are processed in sorted order after being popped off the stack which is not strictly necessary since only a stable hash is required, but having the values in sorted order is certainly nicer when debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did replace the usage of streams both here and below.

} else if (value instanceof Map) {
@SuppressWarnings("unchecked")
var map = (Map<String, Object>) value;
// process map entries in consistent order
final Comparator<Map.Entry<String, Object>> keyComparator = Map.Entry.comparingByKey(Comparator.naturalOrder());
var entryList = map.entrySet().stream().sorted(keyComparator).collect(Collectors.toList());
danhermann marked this conversation as resolved.
Show resolved Hide resolved
for (int k = entryList.size() - 1; k >= 0; k--) {
values.push(entryList.get(k));
}
} else if (value instanceof Map.Entry) {
var entry = (Map.Entry<?, ?>) value;
hasher.update(DELIMITER);
hasher.update(toBytes(entry.getKey()));
values.push(entry.getValue());
} else {
// feed them through digest.update
hasher.update(DELIMITER);
hasher.update(toBytes(value));
}
}

ingestDocument.setFieldValue(targetField, Base64.getEncoder().encodeToString(hasher.digest()));
}

return ingestDocument;
}

static byte[] toBytes(Object value) {
if (value instanceof String) {
return ((String) value).getBytes(StandardCharsets.UTF_8);
}
if (value instanceof byte[]) {
return (byte[]) value;
}
if (value instanceof Integer) {
byte[] intBytes = new byte[4];
ByteUtils.writeIntLE((Integer) value, intBytes, 0);
return intBytes;
}
if (value instanceof Long) {
byte[] longBytes = new byte[8];
ByteUtils.writeLongLE((Long) value, longBytes, 0);
return longBytes;
}
if (value instanceof Float) {
byte[] floatBytes = new byte[4];
ByteUtils.writeFloatLE((Float) value, floatBytes, 0);
return floatBytes;
}
if (value instanceof Double) {
byte[] doubleBytes = new byte[8];
ByteUtils.writeDoubleLE((Double) value, doubleBytes, 0);
return doubleBytes;
}
if (value instanceof Boolean) {
return (Boolean) value ? TRUE_BYTES : FALSE_BYTES;
}
if (value instanceof ZonedDateTime) {
ZonedDateTime zdt = (ZonedDateTime) value;
byte[] zoneIdBytes = zdt.getZone().getId().getBytes(StandardCharsets.UTF_8);
byte[] zdtBytes = new byte[32 + zoneIdBytes.length];
ByteUtils.writeIntLE(zdt.getYear(), zdtBytes, 0);
ByteUtils.writeIntLE(zdt.getMonthValue(), zdtBytes, 4);
ByteUtils.writeIntLE(zdt.getDayOfMonth(), zdtBytes, 8);
ByteUtils.writeIntLE(zdt.getHour(), zdtBytes, 12);
ByteUtils.writeIntLE(zdt.getMinute(), zdtBytes, 16);
ByteUtils.writeIntLE(zdt.getSecond(), zdtBytes, 20);
ByteUtils.writeIntLE(zdt.getNano(), zdtBytes, 24);
ByteUtils.writeIntLE(zdt.getOffset().getTotalSeconds(), zdtBytes, 28);
System.arraycopy(zoneIdBytes, 0, zdtBytes, 32, zoneIdBytes.length);
return zdtBytes;
}
if (value instanceof Date) {
byte[] dateBytes = new byte[8];
ByteUtils.writeLongLE(((Date) value).getTime(), dateBytes, 0);
return dateBytes;
}
if (value == null) {
return new byte[0];
}
throw new IllegalArgumentException("cannot convert object of type [" + value.getClass().getName() + "] to bytes");
}

public List<String> getFields() {
return fields;
}

public String getTargetField() {
return targetField;
}

public ThreadLocal<Hasher> getThreadLocalHasher() {
return threadLocalHasher;
}

public byte[] getSalt() {
return salt;
}

public boolean isIgnoreMissing() {
return ignoreMissing;
}

@Override
public String getType() {
return TYPE;
}

public static final class Factory implements Processor.Factory {

public static final String[] SUPPORTED_DIGESTS = { "MD5", "SHA-1", "SHA-256", "SHA-512" };

static final String DEFAULT_TARGET = "fingerprint";
static final String DEFAULT_SALT = "";
static final String DEFAULT_METHOD = "SHA-1";

@Override
public FingerprintProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
) throws Exception {
List<String> fields = ConfigurationUtils.readList(TYPE, processorTag, config, "fields");
if (fields.size() < 1) {
throw newConfigurationException(TYPE, processorTag, "fields", "must specify at least one field");
}

String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET);
String salt = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "salt", DEFAULT_SALT);
byte[] saltBytes = salt.length() > 0 ? toBytes(salt) : new byte[0];
danhermann marked this conversation as resolved.
Show resolved Hide resolved
String method = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "method", DEFAULT_METHOD);
if (Arrays.asList(SUPPORTED_DIGESTS).contains(method) == false) {
throw newConfigurationException(TYPE, processorTag, "method", "[" + method + "] is not a supported hash method");
danhermann marked this conversation as resolved.
Show resolved Hide resolved
}
ThreadLocal<Hasher> threadLocalHasher = ThreadLocal.withInitial(() -> {
try {
return MessageDigestHasher.getInstance(method);
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("unexpected exception creating MessageDigest instance for [" + method + "]", e);
}
});
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);

return new FingerprintProcessor(processorTag, description, fields, targetField, saltBytes, threadLocalHasher, ignoreMissing);
}
}

// simple interface around MessageDigest to facilitate testing
public interface Hasher {

void reset();

void update(byte[] input);

byte[] digest();

String getAlgorithm();
}

static class MessageDigestHasher implements Hasher {

private final MessageDigest md;

private MessageDigestHasher(MessageDigest md) {
this.md = md;
}

static MessageDigestHasher getInstance(String method) throws NoSuchAlgorithmException {
MessageDigest md = MessageDigest.getInstance(method);
return new MessageDigestHasher(md);
}

@Override
public void reset() {
md.reset();
}

@Override
public void update(byte[] input) {
md.update(input);
}

@Override
public byte[] digest() {
return md.digest();
}

@Override
public String getAlgorithm() {
return md.getAlgorithm();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
NetworkDirectionProcessor.TYPE,
new NetworkDirectionProcessor.Factory(),
CommunityIdProcessor.TYPE,
new CommunityIdProcessor.Factory()
new CommunityIdProcessor.Factory(),
FingerprintProcessor.TYPE,
new FingerprintProcessor.Factory()
);
}
}
Loading