Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARCTIC-1025][FLINK] Fix data duplication even if there is a primary key table with upsert enabled #1180

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/**
* This helper operates to one arctic table and the data of the table.
*/
public class ShuffleHelper implements Serializable {
public class PartitionPrimaryKeyHelper implements Serializable {
private static final long serialVersionUID = 1L;

private boolean primaryKeyExist = false;
Expand All @@ -46,9 +46,9 @@ public class ShuffleHelper implements Serializable {
private Types.StructType struct;
private transient RowDataWrapper rowDataWrapper;

public static ShuffleHelper EMPTY = new ShuffleHelper();
public static PartitionPrimaryKeyHelper EMPTY = new PartitionPrimaryKeyHelper();

public static ShuffleHelper build(ArcticTable table, Schema schema, RowType rowType) {
public static PartitionPrimaryKeyHelper build(ArcticTable table, Schema schema, RowType rowType) {
PartitionKey partitionKey = null;

if (table.spec() != null && !CollectionUtil.isNullOrEmpty(table.spec().fields())) {
Expand All @@ -57,12 +57,12 @@ public static ShuffleHelper build(ArcticTable table, Schema schema, RowType rowT
schema = addFieldsNotInArctic(schema, rowType);

if (table.isUnkeyedTable()) {
return new ShuffleHelper(rowType, schema.asStruct(), partitionKey);
return new PartitionPrimaryKeyHelper(rowType, schema.asStruct(), partitionKey);
}

KeyedTable keyedTable = table.asKeyedTable();
PrimaryKeyData primaryKeyData = new PrimaryKeyData(keyedTable.primaryKeySpec(), schema);
return new ShuffleHelper(keyedTable.primaryKeySpec().primaryKeyExisted(),
return new PartitionPrimaryKeyHelper(keyedTable.primaryKeySpec().primaryKeyExisted(),
primaryKeyData, partitionKey, rowType, schema.asStruct());
}

Expand Down Expand Up @@ -101,21 +101,25 @@ public void open() {
}
}

public ShuffleHelper() {
public PartitionPrimaryKeyHelper() {
}

public ShuffleHelper(RowType rowType, Types.StructType structType,
PartitionKey partitionKey) {
public PartitionPrimaryKeyHelper(RowType rowType, Types.StructType structType,
PartitionKey partitionKey) {
this(false, null, partitionKey, rowType, structType);
}

public ShuffleHelper(boolean primaryKeyExist, PrimaryKeyData primaryKeyData,
PartitionKey partitionKey, RowType rowType, Types.StructType structType) {
public PartitionPrimaryKeyHelper(boolean primaryKeyExist, PrimaryKeyData primaryKeyData,
PartitionKey partitionKey, RowType rowType, Types.StructType structType) {
this(primaryKeyExist, primaryKeyData, null, partitionKey, rowType, structType);
}

public ShuffleHelper(boolean primaryKeyExist, PrimaryKeyData primaryKeyData, RowDataWrapper rowDataWrapper,
PartitionKey partitionKey, RowType rowType, Types.StructType structType) {
public PartitionPrimaryKeyHelper(boolean primaryKeyExist,
PrimaryKeyData primaryKeyData,
RowDataWrapper rowDataWrapper,
PartitionKey partitionKey,
RowType rowType,
Types.StructType structType) {
this.primaryKeyExist = primaryKeyExist;
this.primaryKeyData = primaryKeyData;
this.rowDataWrapper = rowDataWrapper;
Expand All @@ -142,4 +146,12 @@ public int hashKeyValue(RowData rowData) {
primaryKeyData.primaryKey(rowDataWrapper.wrap(rowData));
return primaryKeyData.hashCode();
}

public PrimaryKeyData key(RowData rowData) {
if (primaryKeyData == null) {
return null;
}
primaryKeyData.primaryKey(rowDataWrapper.wrap(rowData));
return primaryKeyData.copy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@
public class ReadShuffleRulePolicy implements ShuffleRulePolicy<RowData, ShuffleKey> {
private static final Logger LOG = LoggerFactory.getLogger(ReadShuffleRulePolicy.class);

private final ShuffleHelper helper;
private final PartitionPrimaryKeyHelper helper;

private final DistributionHashMode distributionHashMode;

public ReadShuffleRulePolicy(ShuffleHelper helper) {
public ReadShuffleRulePolicy(PartitionPrimaryKeyHelper helper) {
this(helper, DistributionHashMode.autoSelect(helper.isPrimaryKeyExist(), helper.isPartitionKeyExist()));
}

public ReadShuffleRulePolicy(ShuffleHelper helper,
public ReadShuffleRulePolicy(PartitionPrimaryKeyHelper helper,
DistributionHashMode distributionHashMode) {
this.helper = helper;
this.distributionHashMode = distributionHashMode;
Expand Down Expand Up @@ -81,11 +81,11 @@ public ShuffleKey getKey(RowData value) throws Exception {
* Circular polling feed a streamRecord into a special factor node
*/
static class RoundRobinPartitioner implements Partitioner<ShuffleKey> {
private final ShuffleHelper helper;
private final PartitionPrimaryKeyHelper helper;
private final DistributionHashMode distributionHashMode;
private Random random = null;

RoundRobinPartitioner(DistributionHashMode distributionHashMode, ShuffleHelper helper) {
RoundRobinPartitioner(DistributionHashMode distributionHashMode, PartitionPrimaryKeyHelper helper) {
this.distributionHashMode = distributionHashMode;
this.helper = helper;
if (!distributionHashMode.isSupportPartition() && !distributionHashMode.isSupportPrimaryKey()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
public class RoundRobinShuffleRulePolicy implements ShuffleRulePolicy<RowData, ShuffleKey> {
private static final Logger LOG = LoggerFactory.getLogger(RoundRobinShuffleRulePolicy.class);

private final ShuffleHelper helper;
private final PartitionPrimaryKeyHelper helper;

private final int downStreamOperatorParallelism;

Expand All @@ -64,14 +64,14 @@ public RoundRobinShuffleRulePolicy(int downStreamOperatorParallelism,
this(null, downStreamOperatorParallelism, fileSplit);
}

public RoundRobinShuffleRulePolicy(ShuffleHelper helper,
public RoundRobinShuffleRulePolicy(PartitionPrimaryKeyHelper helper,
int downStreamOperatorParallelism,
int fileSplit) {
this(helper, downStreamOperatorParallelism, fileSplit,
DistributionHashMode.autoSelect(helper.isPrimaryKeyExist(), helper.isPartitionKeyExist()));
}

public RoundRobinShuffleRulePolicy(ShuffleHelper helper,
public RoundRobinShuffleRulePolicy(PartitionPrimaryKeyHelper helper,
int downStreamOperatorParallelism,
int fileSplit, DistributionHashMode distributionHashMode) {
this.helper = helper;
Expand Down Expand Up @@ -176,11 +176,11 @@ public ShuffleKey getKey(RowData value) throws Exception {
static class RoundRobinPartitioner implements Partitioner<ShuffleKey> {
private final int downStreamOperatorParallelism;
private final int factor;
private final ShuffleHelper helper;
private final PartitionPrimaryKeyHelper helper;
private final DistributionHashMode distributionHashMode;

RoundRobinPartitioner(int downStreamOperatorParallelism, int factor,
DistributionHashMode distributionHashMode, ShuffleHelper helper) {
DistributionHashMode distributionHashMode, PartitionPrimaryKeyHelper helper) {
this.downStreamOperatorParallelism = downStreamOperatorParallelism;
this.factor = factor;
this.distributionHashMode = distributionHashMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.netease.arctic.flink.metric.MetricsGenerator;
import com.netease.arctic.flink.shuffle.LogRecordV1;
import com.netease.arctic.flink.shuffle.ShuffleHelper;
import com.netease.arctic.flink.shuffle.PartitionPrimaryKeyHelper;
import com.netease.arctic.flink.table.ArcticTableLoader;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.flink.write.ArcticLogWriter;
Expand Down Expand Up @@ -145,7 +145,7 @@ public static ArcticLogWriter buildArcticLogWriter(Map<String, String> propertie
@Nullable String topic,
TableSchema tableSchema,
String arcticEmitMode,
ShuffleHelper helper,
PartitionPrimaryKeyHelper helper,
ArcticTableLoader tableLoader,
Duration watermarkWriteGap) {
if (!arcticWALWriterEnable(properties, arcticEmitMode)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package com.netease.arctic.flink.write;

import com.netease.arctic.data.DataTreeNode;
import com.netease.arctic.data.PrimaryKeyData;
import com.netease.arctic.flink.shuffle.PartitionPrimaryKeyHelper;
import com.netease.arctic.flink.shuffle.ShuffleKey;
import com.netease.arctic.flink.shuffle.ShuffleRulePolicy;
import com.netease.arctic.flink.table.ArcticTableLoader;
Expand All @@ -45,6 +47,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -65,6 +68,7 @@ public class ArcticFileWriter extends AbstractStreamOperator<WriteResult>
private final ArcticTableLoader tableLoader;
private final boolean upsert;
private final boolean submitEmptySnapshot;
private final PartitionPrimaryKeyHelper primaryKeyHelper;

private transient org.apache.iceberg.io.TaskWriter<RowData> writer;
private transient int subTaskId;
Expand All @@ -78,14 +82,19 @@ public class ArcticFileWriter extends AbstractStreamOperator<WriteResult>
* if Arctic's table is KERBEROS enabled. It will cause ugi relevant exception when deploy to yarn cluster.
*/
private transient ArcticTable table;
/**
* Track whether there is update_before before update_after or not if upsert enabled.
*/
private Set<PrimaryKeyData> hasUpdateBeforeKeys = new HashSet<>();

public ArcticFileWriter(
ShuffleRulePolicy<RowData, ShuffleKey> shuffleRule,
TaskWriterFactory<RowData> taskWriterFactory,
int minFileSplitCount,
ArcticTableLoader tableLoader,
boolean upsert,
boolean submitEmptySnapshot) {
boolean submitEmptySnapshot,
PartitionPrimaryKeyHelper primaryKeyHelper) {
this.shuffleRule = shuffleRule;
this.taskWriterFactory = taskWriterFactory;
this.minFileSplitCount = minFileSplitCount;
Expand All @@ -94,6 +103,7 @@ public ArcticFileWriter(
this.submitEmptySnapshot = submitEmptySnapshot;
LOG.info("ArcticFileWriter is created with minFileSplitCount: {}, upsert: {}, submitEmptySnapshot: {}",
minFileSplitCount, upsert, submitEmptySnapshot);
this.primaryKeyHelper = primaryKeyHelper;
}

@Override
Expand All @@ -106,6 +116,7 @@ public void open() {
initTaskWriterFactory(mask);

this.writer = table.io().doAs(taskWriterFactory::create);
primaryKeyHelper.open();
}

@Override
Expand All @@ -119,7 +130,7 @@ public void initializeState(StateInitializationContext context) throws Exception
new ListStateDescriptor<>(
subTaskId + "-task-file-writer-state",
LongSerializer.INSTANCE));

if (context.isRestored()) {
// get last success ckp num from state when failover continuously
checkpointId = checkpointState.get().iterator().next();
Expand All @@ -135,7 +146,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
checkpointState.clear();
checkpointState.add(context.getCheckpointId());
}

private void initTaskWriterFactory(long mask) {
if (taskWriterFactory instanceof ArcticRowDataTaskWriterFactory) {
((ArcticRowDataTaskWriterFactory) taskWriterFactory).setMask(mask);
Expand Down Expand Up @@ -201,7 +212,7 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
if (writer == null) {
this.writer = taskWriterFactory.create();
}

processMultiUpdateAfter(row);
if (upsert && RowKind.INSERT.equals(row.getRowKind())) {
row.setRowKind(RowKind.DELETE);
writer.write(row);
Expand All @@ -213,6 +224,22 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
});
}

/**
* Turn update_after to insert if there isn't update_after followed by update_before.
*/
private void processMultiUpdateAfter(RowData row) {
PrimaryKeyData key = primaryKeyHelper.key(row);

if (RowKind.UPDATE_AFTER.equals(row.getRowKind()) && !hasUpdateBeforeKeys.contains(key)) {
row.setRowKind(RowKind.INSERT);
}
if (RowKind.UPDATE_BEFORE.equals(row.getRowKind())) {
hasUpdateBeforeKeys.add(key);
} else {
hasUpdateBeforeKeys.remove(key);
}
}

@Override
public void dispose() throws Exception {
super.dispose();
Expand All @@ -237,7 +264,7 @@ private void emit(WriteResult writeResult) {
*
* @param writeResult the WriteResult to emit
* @return true if the WriteResult should be emitted, or the WriteResult isn't empty,
* false only if the WriteResult is empty and the submitEmptySnapshot is false.
* false only if the WriteResult is empty and the submitEmptySnapshot is false.
*/
private boolean shouldEmit(WriteResult writeResult) {
return submitEmptySnapshot || (writeResult != null &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.netease.arctic.flink.write;

import com.netease.arctic.flink.shuffle.ShuffleHelper;
import com.netease.arctic.flink.shuffle.PartitionPrimaryKeyHelper;
import com.netease.arctic.flink.table.ArcticTableLoader;
import com.netease.arctic.flink.write.hidden.HiddenLogWriter;
import com.netease.arctic.flink.write.hidden.LogMsgFactory;
Expand Down Expand Up @@ -52,7 +52,7 @@ public AutomaticLogWriter(
LogMsgFactory<RowData> factory,
LogData.FieldGetterFactory<RowData> fieldGetterFactory,
byte[] jobId,
ShuffleHelper helper,
PartitionPrimaryKeyHelper helper,
ArcticTableLoader tableLoader,
Duration writeLogstoreWatermarkGap) {
this.arcticLogWriter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package com.netease.arctic.flink.write;

import com.netease.arctic.flink.metric.MetricsGenerator;
import com.netease.arctic.flink.shuffle.PartitionPrimaryKeyHelper;
import com.netease.arctic.flink.shuffle.RoundRobinShuffleRulePolicy;
import com.netease.arctic.flink.shuffle.ShuffleHelper;
import com.netease.arctic.flink.shuffle.ShuffleKey;
import com.netease.arctic.flink.shuffle.ShuffleRulePolicy;
import com.netease.arctic.flink.table.ArcticTableLoader;
Expand Down Expand Up @@ -177,7 +177,7 @@ public DataStreamSink<?> build() {

DistributionHashMode distributionMode = getDistributionHashMode();
LOG.info("take effect distribute mode: {}", distributionMode);
ShuffleHelper helper = ShuffleHelper.build(table, writeSchema, flinkSchemaRowType);
PartitionPrimaryKeyHelper helper = PartitionPrimaryKeyHelper.build(table, writeSchema, flinkSchemaRowType);

ShuffleRulePolicy<RowData, ShuffleKey>
shufflePolicy = buildShuffleRulePolicy(helper, writeOperatorParallelism, distributionMode, overwrite, table);
Expand All @@ -196,7 +196,7 @@ public DataStreamSink<?> build() {
final Duration watermarkWriteGap = config.get(AUTO_EMIT_LOGSTORE_WATERMARK_GAP);

ArcticFileWriter fileWriter = createFileWriter(table, shufflePolicy, overwrite, flinkSchemaRowType,
arcticEmitMode, tableLoader);
arcticEmitMode, tableLoader, writeSchema);

ArcticLogWriter logWriter = ArcticUtils.buildArcticLogWriter(table.properties(),
producerConfig, topic, flinkSchema, arcticEmitMode, helper, tableLoader, watermarkWriteGap);
Expand Down Expand Up @@ -256,7 +256,7 @@ private DistributionHashMode getDistributionHashMode() {

@Nullable
public static ShuffleRulePolicy<RowData, ShuffleKey> buildShuffleRulePolicy(
ShuffleHelper helper,
PartitionPrimaryKeyHelper helper,
int writeOperatorParallelism,
DistributionHashMode distributionHashMode,
boolean overwrite,
Expand Down Expand Up @@ -298,16 +298,19 @@ public static ArcticFileWriter createFileWriter(ArcticTable arcticTable,
boolean overwrite,
RowType flinkSchema,
ArcticTableLoader tableLoader) {
Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)),
arcticTable.schema());
return createFileWriter(arcticTable, shufflePolicy, overwrite, flinkSchema, ARCTIC_EMIT_FILE,
tableLoader);
tableLoader, writeSchema);
}

public static ArcticFileWriter createFileWriter(ArcticTable arcticTable,
ShuffleRulePolicy shufflePolicy,
boolean overwrite,
RowType flinkSchema,
String emitMode,
ArcticTableLoader tableLoader) {
ArcticTableLoader tableLoader,
Schema writeSchema) {
if (!ArcticUtils.arcticFileWriterEnable(emitMode)) {
return null;
}
Expand All @@ -331,7 +334,8 @@ public static ArcticFileWriter createFileWriter(ArcticTable arcticTable,
minFileSplitCount,
tableLoader,
upsert,
submitEmptySnapshot);
submitEmptySnapshot,
PartitionPrimaryKeyHelper.build(arcticTable, writeSchema, flinkSchema));
}

private static TaskWriterFactory<RowData> createTaskWriterFactory(ArcticTable arcticTable,
Expand Down
Loading