Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected IoTDBSyncClientManager constructClient(
protected PipeTransferFilePieceReq getTransferSingleFilePieceReq(
final String fileName, final long position, final byte[] payLoad) {
throw new UnsupportedOperationException(
"The config region connector does not support transferring single file piece req.");
"The config region sink does not support transferring single file piece req.");
}

@Override
Expand All @@ -114,13 +114,13 @@ protected void mayLimitRateAndRecordIO(final long requiredBytes) {
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
throw new UnsupportedOperationException(
"IoTDBConfigRegionConnector can't transfer TabletInsertionEvent.");
"IoTDBConfigRegionSink can't transfer TabletInsertionEvent.");
}

@Override
public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
throw new UnsupportedOperationException(
"IoTDBConfigRegionConnector can't transfer TsFileInsertionEvent.");
"IoTDBConfigRegionSink can't transfer TsFileInsertionEvent.");
}

@Override
Expand All @@ -130,8 +130,7 @@ public void transfer(final Event event) throws Exception {
} else if (event instanceof PipeConfigRegionSnapshotEvent) {
doTransferWrapper((PipeConfigRegionSnapshotEvent) event);
} else if (!(event instanceof PipeHeartbeatEvent)) {
LOGGER.warn(
"IoTDBConfigRegionConnector does not support transferring generic event: {}.", event);
LOGGER.warn("IoTDBConfigRegionSink does not support transferring generic event: {}.", event);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,22 @@
// TODO: validate visibility for schema region and config region
final Visibility pipeVisibility =
VisibilityUtils.calculateFromExtractorParameters(new PipeParameters(sourceAttributes));
final Visibility extractorVisibility =
final Visibility sourceVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryExtractor.getClass());
final Visibility processorVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryProcessor.getClass());
final Visibility connectorVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryConnector.getClass());
if (!VisibilityUtils.isCompatible(
pipeVisibility, extractorVisibility, processorVisibility, connectorVisibility)) {
pipeVisibility, sourceVisibility, processorVisibility, connectorVisibility)) {
throw new PipeParameterNotValidException(
String.format(
"The visibility of the pipe (%s, %s) is not compatible with the visibility of the extractor (%s, %s, %s), processor (%s, %s, %s), and connector (%s, %s, %s).",
"The visibility of the pipe (%s, %s) is not compatible with the visibility of the source (%s, %s, %s), processor (%s, %s, %s), and connector (%s, %s, %s).",

Check warning on line 88 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 170).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ3XaQc4h9XdQlJorfoM&open=AZ3XaQc4h9XdQlJorfoM&pullRequest=17570
pipeName,
pipeVisibility,
sourceAttributes,
temporaryExtractor.getClass().getName(),
extractorVisibility,
sourceVisibility,
processorAttributes,
temporaryProcessor.getClass().getName(),
processorVisibility,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,32 @@ public class PipeDataNodeTask implements PipeTask {
private final String pipeName;
private final int regionId;

private final PipeTaskStage extractorStage;
private final PipeTaskStage sourceStage;
private final PipeTaskStage processorStage;
private final PipeTaskStage connectorStage;
private final PipeTaskStage sinkStage;

private volatile boolean isCompleted = false;

public PipeDataNodeTask(
final String pipeName,
final int regionId,
final PipeTaskStage extractorStage,
final PipeTaskStage sourceStage,
final PipeTaskStage processorStage,
final PipeTaskStage connectorStage) {
final PipeTaskStage sinkStage) {
this.pipeName = pipeName;
this.regionId = regionId;

this.extractorStage = extractorStage;
this.sourceStage = sourceStage;
this.processorStage = processorStage;
this.connectorStage = connectorStage;
this.sinkStage = sinkStage;
}

@Override
public void create() {
final long startTime = System.currentTimeMillis();
extractorStage.create();
sourceStage.create();
processorStage.create();
connectorStage.create();
sinkStage.create();
LOGGER.info(
"Create pipe DN task {} successfully within {} ms",
this,
Expand All @@ -67,9 +67,9 @@ public void create() {
@Override
public void drop() {
final long startTime = System.currentTimeMillis();
extractorStage.drop();
sourceStage.drop();
processorStage.drop();
connectorStage.drop();
sinkStage.drop();
LOGGER.info(
"Drop pipe DN task {} successfully within {} ms",
this,
Expand All @@ -79,9 +79,9 @@ public void drop() {
@Override
public void start() {
final long startTime = System.currentTimeMillis();
extractorStage.start();
sourceStage.start();
processorStage.start();
connectorStage.start();
sinkStage.start();
LOGGER.info(
"Start pipe DN task {} successfully within {} ms",
this,
Expand All @@ -91,9 +91,9 @@ public void start() {
@Override
public void stop() {
final long startTime = System.currentTimeMillis();
extractorStage.stop();
sourceStage.stop();
processorStage.stop();
connectorStage.stop();
sinkStage.stop();
LOGGER.info(
"Stop pipe DN task {} successfully within {} ms",
this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,18 @@ public Map<Integer, PipeTask> buildTasksWithInternalSource() throws IllegalPathE
final PipeTaskMeta pipeTaskMeta = consensusGroupIdToPipeTaskMeta.getValue();

if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
final PipeParameters extractorParameters = pipeStaticMeta.getSourceParameters();
final PipeParameters sourceParameters = pipeStaticMeta.getSourceParameters();
final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
final boolean needConstructDataRegionTask =
dataRegionIds.contains(dataRegionId)
&& DataRegionListeningFilter.shouldDataRegionBeListened(
extractorParameters, dataRegionId);
sourceParameters, dataRegionId);
final boolean needConstructSchemaRegionTask =
schemaRegionIds.contains(new SchemaRegionId(consensusGroupId))
&& SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
consensusGroupId, extractorParameters);
consensusGroupId, sourceParameters);

// Advance the extractor parameters parsing logic to avoid creating un-relevant pipeTasks
// Advance the source parameters parsing logic to avoid creating un-relevant pipeTasks
if (needConstructDataRegionTask || needConstructSchemaRegionTask) {
consensusGroupIdToPipeTaskMap.put(
consensusGroupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class PipeSinkSubtaskExecutor extends PipeSubtaskExecutor {
public PipeSinkSubtaskExecutor() {
super(
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL.getName() + "-" + id.get(),
ThreadName.PIPE_SINK_EXECUTOR_POOL.getName() + "-" + id.get(),
ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName() + "-" + id.getAndIncrement(),
true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
* @param creationTime pipe creation time
* @param pipeProcessorParameters used to create {@link PipeProcessor}
* @param regionId {@link DataRegion} id
* @param pipeExtractorInputEventSupplier used to input {@link Event}s from {@link PipeExtractor}
* @param pipeConnectorOutputPendingQueue used to output {@link Event}s to {@link PipeConnector}
* @param pipeSourceInputEventSupplier used to input {@link Event}s from {@link PipeExtractor}
* @param pipeSinkOutputPendingQueue used to output {@link Event}s to {@link PipeConnector}
* @throws PipeException if failed to {@link PipeProcessor#validate(PipeParameterValidator)} or
* {@link PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)}}
*/
Expand All @@ -66,8 +66,8 @@ public PipeTaskProcessorStage(
final long creationTime,
final PipeParameters pipeProcessorParameters,
final int regionId,
final EventSupplier pipeExtractorInputEventSupplier,
final UnboundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue,
final EventSupplier pipeSourceInputEventSupplier,
final UnboundedBlockingPendingQueue<Event> pipeSinkOutputPendingQueue,
final PipeProcessorSubtaskExecutor executor,
final PipeTaskMeta pipeTaskMeta,
final boolean forceTabletFormat,
Expand Down Expand Up @@ -103,9 +103,9 @@ public PipeTaskProcessorStage(
// old one, so we need creationTime to make their hash code different in the map.
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
final boolean isUsedForConsensusPipe = pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX);
final PipeEventCollector pipeConnectorOutputEventCollector =
final PipeEventCollector pipeSinkOutputEventCollector =
new PipeEventCollector(
pipeConnectorOutputPendingQueue,
pipeSinkOutputPendingQueue,
creationTime,
regionId,
forceTabletFormat,
Expand All @@ -117,9 +117,9 @@ public PipeTaskProcessorStage(
pipeName,
creationTime,
regionId,
pipeExtractorInputEventSupplier,
pipeSourceInputEventSupplier,
pipeProcessor,
pipeConnectorOutputEventCollector);
pipeSinkOutputEventCollector);

this.executor = executor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
protected final int regionId;
protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;

protected String connectorSubtaskId;
protected String sinkSubtaskId;

public PipeTaskSinkStage(
String pipeName,
Expand All @@ -56,7 +56,7 @@ public PipeTaskSinkStage(
}

protected void registerSubtask() {
this.connectorSubtaskId =
this.sinkSubtaskId =
PipeSinkSubtaskManager.instance()
.register(
executor,
Expand All @@ -71,21 +71,20 @@ public void createSubtask() throws PipeException {

@Override
public void startSubtask() throws PipeException {
PipeSinkSubtaskManager.instance().start(connectorSubtaskId);
PipeSinkSubtaskManager.instance().start(sinkSubtaskId);
}

@Override
public void stopSubtask() throws PipeException {
PipeSinkSubtaskManager.instance().stop(connectorSubtaskId);
PipeSinkSubtaskManager.instance().stop(sinkSubtaskId);
}

@Override
public void dropSubtask() throws PipeException {
PipeSinkSubtaskManager.instance()
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
PipeSinkSubtaskManager.instance().deregister(pipeName, creationTime, regionId, sinkSubtaskId);
}

public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
return PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId);
return PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(sinkSubtaskId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,32 @@ public class PipeTaskSourceStage extends PipeTaskStage {
public PipeTaskSourceStage(
String pipeName,
long creationTime,
PipeParameters extractorParameters,
PipeParameters sourceParameters,
int regionId,
PipeTaskMeta pipeTaskMeta) {
pipeExtractor =
StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))
|| PipeRuntimeMeta.isSourceExternal(regionId)
? PipeDataNodeAgent.plugin().dataRegion().reflectSource(extractorParameters)
: PipeDataNodeAgent.plugin().schemaRegion().reflectSource(extractorParameters);
? PipeDataNodeAgent.plugin().dataRegion().reflectSource(sourceParameters)
: PipeDataNodeAgent.plugin().schemaRegion().reflectSource(sourceParameters);

// Validate and customize should be called before createSubtask. this allows extractor exposing
// Validate and customize should be called before createSubtask. this allows source exposing
// exceptions in advance.
try {
// 1. Validate extractor parameters
pipeExtractor.validate(new PipeParameterValidator(extractorParameters));
// 1. Validate source parameters
pipeExtractor.validate(new PipeParameterValidator(sourceParameters));

// 2. Customize extractor
// 2. Customize source
final PipeTaskRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
new PipeTaskSourceRuntimeEnvironment(pipeName, creationTime, regionId, pipeTaskMeta));
pipeExtractor.customize(extractorParameters, runtimeConfiguration);
pipeExtractor.customize(sourceParameters, runtimeConfiguration);
} catch (Exception e) {
try {
pipeExtractor.close();
} catch (Exception closeException) {
LOGGER.warn(
"Failed to close extractor after failed to initialize extractor. "
+ "Ignore this exception.",
"Failed to close source after failed to initialize source. " + "Ignore this exception.",
closeException);
}
throw new PipeException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ public class IoTDBSchemaRegionAirGapSink extends IoTDBDataNodeAirGapSink {
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
throw new UnsupportedOperationException(
"IoTDBSchemaRegionAirGapConnector can't transfer TabletInsertionEvent.");
"IoTDBSchemaRegionAirGapSink can't transfer TabletInsertionEvent.");
}

@Override
public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
throw new UnsupportedOperationException(
"IoTDBSchemaRegionAirGapConnector can't transfer TsFileInsertionEvent.");
"IoTDBSchemaRegionAirGapSink can't transfer TsFileInsertionEvent.");
}

@Override
Expand All @@ -73,8 +73,7 @@ public void transfer(final Event event) throws Exception {
doTransferWrapper(socket, (PipeSchemaRegionSnapshotEvent) event);
} else if (!(event instanceof PipeHeartbeatEvent)) {
LOGGER.warn(
"IoTDBSchemaRegionAirGapConnector does not support transferring generic event: {}.",
event);
"IoTDBSchemaRegionAirGapSink does not support transferring generic event: {}.", event);
}
} catch (final IOException e) {
isSocketAlive.set(socketIndex, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public PipeTransferTabletBatchEventHandler(

public void transfer(final AsyncPipeDataTransferServiceClient client) throws TException {
for (final Map.Entry<Pair<String, Long>, Long> entry : pipeName2BytesAccumulated.entrySet()) {
connector.rateLimitIfNeeded(
sink.rateLimitIfNeeded(
entry.getKey().getLeft(),
entry.getKey().getRight(),
client.getEndPoint(),
Expand All @@ -92,13 +92,11 @@ protected boolean onCompleteInternal(final TPipeTransferResp response) {
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
connector
.statusHandler()
.handle(status, response.getStatus().getMessage(), events.toString());
sink.statusHandler().handle(status, response.getStatus().getMessage(), events.toString());
}
for (final Pair<String, TEndPoint> redirectPair :
LeaderCacheUtils.parseRecommendedRedirections(status)) {
connector.updateLeaderCache(redirectPair.getLeft(), redirectPair.getRight());
sink.updateLeaderCache(redirectPair.getLeft(), redirectPair.getRight());
}

events.forEach(
Expand All @@ -123,7 +121,7 @@ protected void onErrorInternal(final Exception exception) {
events.size(),
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
} finally {
connector.addFailureEventsToRetryQueue(events, exception);
sink.addFailureEventsToRetryQueue(events, exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected void doTransfer(

@Override
protected void updateLeaderCache(final TSStatus status) {
connector.updateLeaderCache(
sink.updateLeaderCache(
((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(), status.getRedirectNode());
}
}
Loading
Loading