diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java index 7da9d8613027f..088a3da08f051 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java @@ -226,7 +226,7 @@ public void testPrepareConflict() throws Exception { transportServiceIterable.forEach(ts -> ((MockTransportService) ts).addSendBehavior(new StubbableTransport.SendRequestBehavior() { @Override public void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException { - if (action.startsWith(ShardPrepareCommitAction.NAME)) { + if (action.equals(ShardPrepareCommitAction.NAME)) { txes.add(((ShardPrepareCommitRequest) request).txid()); new Thread(() -> { try { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index e5d12eecb0241..5a0907ef34d15 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -33,7 +33,6 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.inject.Inject; @@ -81,7 +80,6 @@ public class TransportShardBulkAction extends TransportWriteAction(listener) { private final Executor executor = threadPool.executor(executorName); @@ -189,11 +193,7 @@ public static void performOnPrimary( @Override protected void doRun() throws Exception { - TxID txID1 = TxID.create(); - Translog.Location[] transactionId = new Translog.Location[1]; try { - transactionId[0] = primary.startTransaction(txID1.id()); - transactionRegistry.registerTransaction(txID1, Set.of(transactionId[0].id())); while (context.hasMoreOperationsToExecute()) { if (executeBulkItemRequest( @@ -212,11 +212,12 @@ protected void doRun() throws Exception { assert context.isInitial(); // either completed and moved to next or reset } - primary.commitTransaction(transactionId); + primary.loggingComplete(request.txID(), transactionId[0]); + transactionId[0] = primary.commitTransaction(request.txID()); + primary.closeTransaction(transactionId); } catch (Exception x) { logger.warn("Encountered an error while executing bulk transaction", x); primary.rollbackTransaction(transactionId); - } finally { primary.closeTransaction(transactionId); } primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 902a09aa1d79a..12043ad805a5d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -81,6 +81,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Stream; @@ -323,7 +324,8 @@ public Condition newCondition() { public abstract Translog.Location startTransaction(String id) throws IOException; - public abstract Translog.Location commitTransaction(Translog.Location prevId) throws IOException; + public abstract Translog.Location commitTransaction(Translog.Location prevId, + Function applier) throws IOException; public abstract Translog.Location rollbackTransaction(Translog.Location prevId) throws IOException; @@ -1312,6 +1314,7 @@ public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionTy } public enum Origin { + TRANSACTION, PRIMARY, REPLICA, PEER_RECOVERY, diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index cf2d01f6f0141..4dc28bbcd3264 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2074,7 +2074,10 @@ public Translog.Location startTransaction(String id) throws IOException { } @Override - public Translog.Location commitTransaction(Translog.Location prevId) throws IOException { + public Translog.Location commitTransaction(Translog.Location prevId, Function applier) throws IOException { + Translog.Location commitLocation = translog.add( + new Translog.TxCommit(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId)); + Translog.Location loc = prevId; while (loc != null) { @@ -2087,6 +2090,8 @@ public Translog.Location commitTransaction(Translog.Location prevId) throws IOEx logger.info("Committing op " + op); if (op instanceof Translog.TransactionMember) { + // todo: lots of things here, but maybe this works for now... + applier.apply(op); loc = ((Translog.TransactionMember)op).getTransactionId(); } else if (op instanceof Translog.TxStart) { break; @@ -2096,8 +2101,7 @@ public Translog.Location commitTransaction(Translog.Location prevId) throws IOEx } } - return translog.add( - new Translog.TxCommit(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId)); + return commitLocation; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 55179a4575917..cde1d03566aac 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -48,6 +48,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Stream; @@ -533,7 +534,8 @@ public Translog.Location startTransaction(String id) throws IOException { } @Override - public Translog.Location commitTransaction(Translog.Location transactionId) throws IOException { + public Translog.Location commitTransaction(Translog.Location transactionId, + Function applier) throws IOException { return null; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3660793ce3573..07d18ad40b63b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -162,7 +162,6 @@ import java.util.Collections; import java.util.Deque; import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -887,9 +886,15 @@ public Translog.Location startTransaction(String id) throws IOException { return getEngine().startTransaction(id); } - public boolean commitTransaction(Translog.Location[] transactionId) throws IOException { - transactionId[0] = getEngine().commitTransaction(transactionId[0]); - return true; + public Translog.Location commitTransaction(TxID txID) throws IOException { + return getEngine().commitTransaction(transactionRegistry.translogHead(txID), operation -> { + try { + return applyTranslogOperation(operation, Engine.Operation.Origin.TRANSACTION); + } catch (IOException e) { + assert false; + throw new RuntimeException(e); + } + }); } public boolean rollbackTransaction(Translog.Location[] transactionId) throws IOException { @@ -1613,6 +1618,10 @@ static Engine.Searcher wrapSearcher( ); // completes stats recording } + public void loggingComplete(TxID txID, Translog.Location headOfTranslogList) { + transactionRegistry.loggingComplete(txID, headOfTranslogList); + } + private static final class NonClosingReaderWrapper extends FilterDirectoryReader { private NonClosingReaderWrapper(DirectoryReader in) throws IOException { @@ -2150,6 +2159,8 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn assert assertPrimaryMode(); } else if (origin == Engine.Operation.Origin.REPLICA) { assert assertReplicationTarget(); + } else if (origin == Engine.Operation.Origin.TRANSACTION) { + // empty } else { assert origin == Engine.Operation.Origin.LOCAL_RESET; assert getActiveOperationsCount() == OPERATIONS_BLOCKED @@ -4134,8 +4145,9 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() { return retentionLeaseSyncer; } - public void registerTransaction(TxID id, Set keys) { + public Translog.Location startTransaction(TxID id, Set keys) throws IOException { transactionRegistry.registerTransaction(id, keys); + return getEngine().startTransaction(id.id()); } public Map prepareCommit(TxID txID) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardTransactionRegistry.java b/server/src/main/java/org/elasticsearch/index/shard/ShardTransactionRegistry.java index aa08ab6f09981..f4a742783a1d9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ShardTransactionRegistry.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardTransactionRegistry.java @@ -9,6 +9,7 @@ package org.elasticsearch.index.shard; import org.elasticsearch.action.bulk.TxID; +import org.elasticsearch.index.translog.Translog; import java.util.HashMap; import java.util.HashSet; @@ -21,6 +22,8 @@ public class ShardTransactionRegistry { private final Map> byKey = new HashMap<>(); private final Map> byTxID = new HashMap<>(); private final Map> conflictingKeysByTxID = new HashMap<>(); + private final Map translogHeads = new HashMap<>(); + private final Set prepared = new HashSet<>(); // todo: less locking and perhaps totally different content... public synchronized void registerTransaction(TxID txID, Set ids) { @@ -37,6 +40,7 @@ public synchronized void registerTransaction(TxID txID, Set ids) { assert invariant(); } + public synchronized void releaseTransaction(TxID txID) { byTxID.remove(txID).forEach(id -> cleanByKey(id, txID)); prepared.remove(txID); @@ -44,6 +48,12 @@ public synchronized void releaseTransaction(TxID txID) { assert invariant(); } + public synchronized void loggingComplete(TxID txID, Translog.Location headOfTranslogList) { + assert translogHeads.containsKey(txID) == false; + + translogHeads.put(txID, headOfTranslogList); + } + public synchronized Map prepare(TxID txID) { assert byTxID.containsKey(txID); assert prepared.contains(txID) == false; @@ -99,4 +109,8 @@ Set keys(TxID txID) { public int size() { return byTxID.size(); } + + public Translog.Location translogHead(TxID txID) { + return translogHeads.get(txID); + } }