Skip to content

Commit

Permalink
[backend] change how we check if need to fetch rss and csv feed (Open…
Browse files Browse the repository at this point in the history
  • Loading branch information
lndrtrbn authored Feb 20, 2025
1 parent ece6c25 commit 51e68fe
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions opencti-platform/opencti-graphql/src/manager/ingestionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,19 @@ const rssExecutor = async (context: AuthContext, turndownService: TurndownServic
const ingestionPromises = [];
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
// If ingestion have remaining messages in the queue, or if last execution was done before RSS_FEED_MIN_INTERVAL_MINUTES minutes, dont fetch any new data
const { messages_number, messages_size } = await queueDetails(connectorIdFromIngestId(ingestion.id));
if (messages_number === 0 && shouldExecuteIngestion(ingestion, RSS_FEED_MIN_INTERVAL_MINUTES)) {
// If ingestion have remaining messages in the queue dont fetch any new data
if (messages_number > 0) {
logApp.info(`[OPENCTI-MODULE] INGESTION Rss, skipping ${ingestion.name} - queue already filled with messages (${messages_number})`);
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_size });
ingestionPromises.push(ingestionPromise);
// If last execution was done before RSS_FEED_MIN_INTERVAL_MINUTES minutes, dont fetch any new data
} else if (!shouldExecuteIngestion(ingestion, RSS_FEED_MIN_INTERVAL_MINUTES)) {
logApp.info(`[OPENCTI-MODULE] INGESTION Rss, skipping ${ingestion.name} - last run is more recent than ${RSS_FEED_MIN_INTERVAL_MINUTES} minutes.`);
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { messages_size });
ingestionPromises.push(ingestionPromise);
// If no message in queue and last execution is old enough, fetch new data
} else {
const ingestionPromise = rssDataHandler(context, httpGet, turndownService, ingestion)
.catch((e) => {
logApp.warn('[OPENCTI-MODULE] INGESTION - RSS ingestion execution', { cause: e, name: ingestion.name });
Expand All @@ -293,11 +303,6 @@ const rssExecutor = async (context: AuthContext, turndownService: TurndownServic
patchRssIngestion(context, SYSTEM_USER, ingestion.internal_id, { last_execution_date: now() }).catch((reason) => logApp.error('ERROR', { cause: reason }));
});
ingestionPromises.push(ingestionPromise);
} else {
// Update the state
logApp.info(`[OPENCTI-MODULE] INGESTION Rss, skipping ${ingestion.name} - queue already filled with messages (${messages_number}) or last run is more recent than ${RSS_FEED_MIN_INTERVAL_MINUTES} minutes.`);
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_size });
ingestionPromises.push(ingestionPromise);
}
}
return Promise.all(ingestionPromises);
Expand Down Expand Up @@ -558,19 +563,26 @@ const csvExecutor = async (context: AuthContext) => {
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
const { messages_number, messages_size } = await queueDetails(connectorIdFromIngestId(ingestion.id));
if (messages_number === 0 && shouldExecuteIngestion(ingestion, CSV_FEED_MIN_INTERVAL_MINUTES)) {

// If ingestion have remaining messages in the queue dont fetch any new data
if (messages_number > 0) {
logApp.info(`[OPENCTI-MODULE] INGESTION Csv, skipping ${ingestion.name} - queue already filled with messages (${messages_number})`);
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_size });
ingestionPromises.push(ingestionPromise);
// If last execution was done before CSV_FEED_MIN_INTERVAL_MINUTES minutes, dont fetch any new data
} else if (!shouldExecuteIngestion(ingestion, CSV_FEED_MIN_INTERVAL_MINUTES)) {
logApp.info(`[OPENCTI-MODULE] INGESTION Csv, skipping ${ingestion.name} - last run is more recent than ${CSV_FEED_MIN_INTERVAL_MINUTES} minutes.`);
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { messages_size });
ingestionPromises.push(ingestionPromise);
// If no message in queue and last execution is old enough, fetch new data
} else {
const ingestionPromise = csvDataHandler(context, ingestion)
.catch((e) => {
logApp.warn('[OPENCTI-MODULE] INGESTION - CSV ingestion execution', { cause: e, name: ingestion.name });
logApp.warn('[OPENCTI-MODULE] INGESTION - Csv ingestion execution', { cause: e, name: ingestion.name });
// In case of error we need also to take in account the min_interval_minutes with last_execution_date update.
patchCsvIngestion(context, SYSTEM_USER, ingestion.internal_id, { last_execution_date: now() }).catch((reason) => logApp.error('ERROR', { cause: reason }));
});
ingestionPromises.push(ingestionPromise);
} else {
// Update the state
logApp.info(`[OPENCTI-MODULE] INGESTION csv, skipping ${ingestion.name} - queue already filled with messages (${messages_number}) or last run is more recent than ${RSS_FEED_MIN_INTERVAL_MINUTES} minutes.`);
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_size });
ingestionPromises.push(ingestionPromise);
}
}
return Promise.all(ingestionPromises);
Expand Down

0 comments on commit 51e68fe

Please sign in to comment.