diff --git a/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvCreation.tsx b/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvCreation.tsx index d128c00ea314..04392a6010cd 100644 --- a/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvCreation.tsx +++ b/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvCreation.tsx @@ -99,7 +99,6 @@ const IngestionCsvCreation: FunctionComponent = ({ pa } else if (values.authentication_type === 'certificate') { authenticationValue = `${values.cert}:${values.key}:${values.ca}`; } - const userId = typeof values.user_id === 'string' ? values.user_id : values.user_id.value; const input = { name: values.name, description: values.description, @@ -108,7 +107,7 @@ const IngestionCsvCreation: FunctionComponent = ({ pa authentication_type: values.authentication_type, authentication_value: authenticationValue, current_state_date: values.current_state_date, - user_id: userId, + user_id: typeof values.user_id === 'string' ? values.user_id : values.user_id.value, }; commit({ variables: { @@ -330,8 +329,7 @@ const IngestionCsvCreation: FunctionComponent = ({ pa setOpen(false)} - uri={values.uri} - csvMapperId={values.csv_mapper_id} + values={values} setIsCreateDisabled={setIsCreateDisabled} /> diff --git a/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvEdition.tsx b/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvEdition.tsx index affde37f71d9..dc7050f16cbe 100644 --- a/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvEdition.tsx +++ b/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvEdition.tsx @@ -361,8 +361,7 @@ const IngestionCsvEdition: FunctionComponent = ({ setOpen(false)} - uri={values.uri} - csvMapperId={values.csv_mapper_id} + values={values} /> )} diff --git a/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvMapperTestDialog.tsx b/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvMapperTestDialog.tsx index 50e08ffb995f..f199b85b778c 100644 --- a/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvMapperTestDialog.tsx +++ b/opencti-platform/opencti-front/src/private/components/data/ingestionCsv/IngestionCsvMapperTestDialog.tsx @@ -1,4 +1,4 @@ -import { graphql } from 'react-relay'; +import { graphql, useMutation } from 'react-relay'; import React, { FunctionComponent, useState } from 'react'; import Dialog from '@mui/material/Dialog'; import DialogTitle from '@mui/material/DialogTitle'; @@ -6,17 +6,16 @@ import DialogContent from '@mui/material/DialogContent'; import Button from '@mui/material/Button'; import Box from '@mui/material/Box'; import CodeBlock from '@components/common/CodeBlock'; -import { IngestionCsvMapperTestDialogQuery$data } from '@components/data/ingestionCsv/__generated__/IngestionCsvMapperTestDialogQuery.graphql'; -import { Option } from '@components/common/form/ReferenceField'; -import TextField from '@mui/material/TextField'; import Alert from '@mui/material/Alert'; +import { Option } from '@components/common/form/ReferenceField'; +import { IngestionCsvMapperTestDialogMutation$data } from '@components/data/ingestionCsv/__generated__/IngestionCsvMapperTestDialogMutation.graphql'; import Loader, { LoaderVariant } from '../../../../components/Loader'; import { useFormatter } from '../../../../components/i18n'; -import { fetchQuery, handleError } from '../../../../relay/environment'; +import { handleError } from '../../../../relay/environment'; -const ingestionCsvMapperTestQuery = graphql` - query IngestionCsvMapperTestDialogQuery($uri: String!, $csv_mapper_id: String!) { - test_mapper(uri: $uri, csv_mapper_id: $csv_mapper_id) { +const ingestionCsvMapperTestMutation = graphql` + mutation IngestionCsvMapperTestDialogMutation($input: IngestionCsvAddInput!) { + ingestionCsvTester(input: $input) { nbEntities nbRelationships objects @@ -27,20 +26,29 @@ const ingestionCsvMapperTestQuery = graphql` interface IngestionCsvMapperTestDialogProps { open: boolean onClose: () => void - uri: string - csvMapperId: string | Option + values: { + name: string, + description?: string | null, + authentication_type: string, + authentication_value?: string | null, + current_state_date: Date | null, + uri: string, + ingestion_running?: boolean | null, + csv_mapper_id: string | Option, + user_id: string | Option + } setIsCreateDisabled?: React.Dispatch> } const IngestionCsvMapperTestDialog: FunctionComponent = ({ open, onClose, - uri, - csvMapperId, + values, setIsCreateDisabled, }) => { const { t_i18n } = useFormatter(); - const [result, setResult] = useState(undefined); + const [result, setResult] = useState(undefined); + const [commitTest] = useMutation(ingestionCsvMapperTestMutation); const [loading, setLoading] = useState(false); const handleClose = () => { @@ -48,55 +56,43 @@ const IngestionCsvMapperTestDialog: FunctionComponent { + const onTest = () => { setLoading(true); - fetchQuery(ingestionCsvMapperTestQuery, { uri: url, csv_mapper_id }) - .toPromise() - .then((data) => { - const resultTest = (data as IngestionCsvMapperTestDialogQuery$data) - .test_mapper; + commitTest({ + variables: { + input: { + name: values.name, + description: values.description, + authentication_type: values.authentication_type, + authentication_value: values.authentication_value, + current_state_date: values.current_state_date, + uri: values.uri, + ingestion_running: values.ingestion_running, + user_id: typeof values.user_id === 'string' ? values.user_id : values.user_id.value, + csv_mapper_id: typeof values.csv_mapper_id === 'string' ? values.csv_mapper_id : values.csv_mapper_id.value, + }, + }, + onCompleted: (data) => { + const resultTest = (data as IngestionCsvMapperTestDialogMutation$data); if (resultTest) { - setResult({ - test_mapper: { - ...resultTest, - }, - }); + setResult(resultTest); if (setIsCreateDisabled) { - setIsCreateDisabled(resultTest.nbEntities === 0); + setIsCreateDisabled(resultTest.ingestionCsvTester?.nbEntities === 0); } } setLoading(false); - }).catch((error) => { + }, + onError: (error) => { handleError(error); setLoading(false); - }); + }, + }); }; return ( {t_i18n('Testing csv mapper')} - - - -
@@ -135,14 +131,14 @@ const IngestionCsvMapperTestDialog: FunctionComponent {t_i18n('Objects found')} : - {result?.test_mapper?.nbEntities} {t_i18n('Entities')} - {result?.test_mapper?.nbRelationships} {t_i18n('Relationships')} + {result?.ingestionCsvTester?.nbEntities} {t_i18n('Entities')} + {result?.ingestionCsvTester?.nbRelationships} {t_i18n('Relationships')} } diff --git a/opencti-platform/opencti-front/src/private/components/data/ingestionTaxii/IngestionTaxiiLine.jsx b/opencti-platform/opencti-front/src/private/components/data/ingestionTaxii/IngestionTaxiiLine.jsx index d772e046a677..1694061825a8 100644 --- a/opencti-platform/opencti-front/src/private/components/data/ingestionTaxii/IngestionTaxiiLine.jsx +++ b/opencti-platform/opencti-front/src/private/components/data/ingestionTaxii/IngestionTaxiiLine.jsx @@ -107,7 +107,7 @@ class IngestionTaxiiLineLineComponent extends Component { className={classes.bodyItem} style={{ width: dataColumns.added_after_start.width }} > - {node.added_after_start} + {node.added_after_start || node.current_state_cursor}
} @@ -144,6 +144,7 @@ const IngestionTaxiiLineFragment = createFragmentContainer( version ingestion_running added_after_start + current_state_cursor } `, }, diff --git a/opencti-platform/opencti-front/src/schema/relay.schema.graphql b/opencti-platform/opencti-front/src/schema/relay.schema.graphql index e58892465768..1b967d09b832 100644 --- a/opencti-platform/opencti-front/src/schema/relay.schema.graphql +++ b/opencti-platform/opencti-front/src/schema/relay.schema.graphql @@ -7375,7 +7375,6 @@ type Query { ingestionTaxiis(first: Int, after: ID, orderBy: IngestionTaxiiOrdering, orderMode: OrderingMode, filters: FilterGroup, includeAuthorities: Boolean, search: String): IngestionTaxiiConnection ingestionCsv(id: String!): IngestionCsv ingestionCsvs(first: Int, after: ID, orderBy: IngestionCsvOrdering, orderMode: OrderingMode, filters: FilterGroup, includeAuthorities: Boolean, search: String): IngestionCsvConnection - test_mapper(uri: String!, csv_mapper_id: String!): CsvMapperTestResult indicator(id: String!): Indicator indicators(first: Int, after: ID, orderBy: IndicatorsOrdering, orderMode: OrderingMode, filters: FilterGroup, search: String): IndicatorConnection indicatorsTimeSeries(objectId: String, field: String!, operation: StatsOperation!, startDate: DateTime!, endDate: DateTime!, interval: String!, filters: FilterGroup): [TimeSeries] @@ -8125,6 +8124,7 @@ type Mutation { ingestionTaxiiAdd(input: IngestionTaxiiAddInput!): IngestionTaxii ingestionTaxiiDelete(id: ID!): ID ingestionTaxiiFieldPatch(id: ID!, input: [EditInput!]!): IngestionTaxii + ingestionCsvTester(input: IngestionCsvAddInput!): CsvMapperTestResult ingestionCsvAdd(input: IngestionCsvAddInput!): IngestionCsv ingestionCsvDelete(id: ID!): ID ingestionCsvFieldPatch(id: ID!, input: [EditInput!]!): IngestionCsv @@ -10593,6 +10593,7 @@ type IngestionTaxii implements InternalObject & BasicObject { authentication_value: String user_id: String user: Creator + current_state_cursor: String added_after_start: DateTime ingestion_running: Boolean } diff --git a/opencti-platform/opencti-graphql/src/database/rabbitmq.js b/opencti-platform/opencti-graphql/src/database/rabbitmq.js index a7f8a8b05032..eee394d1255a 100644 --- a/opencti-platform/opencti-graphql/src/database/rabbitmq.js +++ b/opencti-platform/opencti-graphql/src/database/rabbitmq.js @@ -103,7 +103,7 @@ export const metrics = async (context, user) => { const metricApi = async () => { const ssl = USE_SSL_MGMT ? 's' : ''; const baseURL = `http${ssl}://${HOSTNAME_MGMT}:${PORT_MGMT}`; - const httpClient = getHttpClient({ + const httpClientOptions = { baseURL, responseType: 'json', rejectUnauthorized: RABBITMQ_MGMT_REJECT_UNAUTHORIZED, @@ -111,7 +111,8 @@ export const metrics = async (context, user) => { username: USERNAME, password: PASSWORD, }, - }); + }; + const httpClient = getHttpClient(httpClientOptions); const overview = await httpClient.get('/api/overview').then((response) => response.data); const queues = await httpClient.get(`/api/queues${VHOST_PATH}`).then((response) => response.data); // Compute number of push queues diff --git a/opencti-platform/opencti-graphql/src/domain/connector.js b/opencti-platform/opencti-graphql/src/domain/connector.js index 9f2260b5159d..b45b3b5141d6 100644 --- a/opencti-platform/opencti-graphql/src/domain/connector.js +++ b/opencti-platform/opencti-graphql/src/domain/connector.js @@ -207,7 +207,7 @@ export const fetchRemoteStreams = async (context, user, { uri, token, ssl_verify const httpClientOptions = { headers, rejectUnauthorized: ssl_verify ?? false, responseType: 'json' }; const httpClient = getHttpClient(httpClientOptions); const remoteUri = `${uri.endsWith('/') ? uri.slice(0, -1) : uri}/graphql`; - const { data } = await httpClient.post(remoteUri, { query }, { withCredentials: true }); + const { data } = await httpClient.post(remoteUri, { query }); return data.data.streamCollections.edges.map((e) => e.node); } catch (e) { throw ValidationError('uri', { message: 'Error getting the streams from remote OpenCTI', cause: e }); diff --git a/opencti-platform/opencti-graphql/src/generated/graphql.ts b/opencti-platform/opencti-graphql/src/generated/graphql.ts index d87d6de8f79a..bad5860d8057 100644 --- a/opencti-platform/opencti-graphql/src/generated/graphql.ts +++ b/opencti-platform/opencti-graphql/src/generated/graphql.ts @@ -9911,6 +9911,7 @@ export type IngestionTaxii = BasicObject & InternalObject & { authentication_value?: Maybe; collection: Scalars['String']['output']; created_at?: Maybe; + current_state_cursor?: Maybe; description?: Maybe; entity_type: Scalars['String']['output']; id: Scalars['ID']['output']; @@ -12264,6 +12265,7 @@ export type Mutation = { ingestionCsvAdd?: Maybe; ingestionCsvDelete?: Maybe; ingestionCsvFieldPatch?: Maybe; + ingestionCsvTester?: Maybe; ingestionRssAdd?: Maybe; ingestionRssDelete?: Maybe; ingestionRssFieldPatch?: Maybe; @@ -13194,6 +13196,11 @@ export type MutationIngestionCsvFieldPatchArgs = { }; +export type MutationIngestionCsvTesterArgs = { + input: IngestionCsvAddInput; +}; + + export type MutationIngestionRssAddArgs = { input: IngestionRssAddInput; }; @@ -17692,7 +17699,6 @@ export type Query = { tasks?: Maybe; taxiiCollection?: Maybe; taxiiCollections?: Maybe; - test_mapper?: Maybe; threatActor?: Maybe; threatActorGroup?: Maybe; threatActorIndividual?: Maybe; @@ -19943,12 +19949,6 @@ export type QueryTaxiiCollectionsArgs = { }; -export type QueryTest_MapperArgs = { - csv_mapper_id: Scalars['String']['input']; - uri: Scalars['String']['input']; -}; - - export type QueryThreatActorArgs = { id?: InputMaybe; }; @@ -32600,6 +32600,7 @@ export type IngestionTaxiiResolvers, ParentType, ContextType>; collection?: Resolver; created_at?: Resolver, ParentType, ContextType>; + current_state_cursor?: Resolver, ParentType, ContextType>; description?: Resolver, ParentType, ContextType>; entity_type?: Resolver; id?: Resolver; @@ -33587,6 +33588,7 @@ export type MutationResolvers, ParentType, ContextType, RequireFields>; ingestionCsvDelete?: Resolver, ParentType, ContextType, RequireFields>; ingestionCsvFieldPatch?: Resolver, ParentType, ContextType, RequireFields>; + ingestionCsvTester?: Resolver, ParentType, ContextType, RequireFields>; ingestionRssAdd?: Resolver, ParentType, ContextType, RequireFields>; ingestionRssDelete?: Resolver, ParentType, ContextType, RequireFields>; ingestionRssFieldPatch?: Resolver, ParentType, ContextType, RequireFields>; @@ -35082,7 +35084,6 @@ export type QueryResolvers, ParentType, ContextType, Partial>; taxiiCollection?: Resolver, ParentType, ContextType, RequireFields>; taxiiCollections?: Resolver, ParentType, ContextType, Partial>; - test_mapper?: Resolver, ParentType, ContextType, RequireFields>; threatActor?: Resolver, ParentType, ContextType, Partial>; threatActorGroup?: Resolver, ParentType, ContextType, Partial>; threatActorIndividual?: Resolver, ParentType, ContextType, RequireFields>; diff --git a/opencti-platform/opencti-graphql/src/manager/ingestionManager.ts b/opencti-platform/opencti-graphql/src/manager/ingestionManager.ts index 40edddc2e761..148e5336ab49 100644 --- a/opencti-platform/opencti-graphql/src/manager/ingestionManager.ts +++ b/opencti-platform/opencti-graphql/src/manager/ingestionManager.ts @@ -6,13 +6,12 @@ import * as R from 'ramda'; import { v4 as uuidv4 } from 'uuid'; import { clearIntervalAsync, setIntervalAsync } from 'set-interval-async/dynamic'; import type { SetIntervalAsyncTimer } from 'set-interval-async/fixed'; -import { AxiosHeaders } from 'axios'; import type { Moment } from 'moment'; import { lockResource } from '../database/redis'; import conf, { booleanConf, logApp } from '../config/conf'; import { TYPE_LOCK_ERROR, UnknownError, UnsupportedError } from '../config/errors'; import { executionContext, SYSTEM_USER } from '../utils/access'; -import { type GetHttpClient, getHttpClient } from '../utils/http-client'; +import { type GetHttpClient, getHttpClient, OpenCTIHeaders } from '../utils/http-client'; import { isEmptyField, isNotEmptyField } from '../database/utils'; import { FROM_START_STR, sanitizeForMomentParsing, utcDate } from '../utils/format'; import { generateStandardId } from '../schema/identifier'; @@ -24,7 +23,7 @@ import type { AuthContext } from '../types/user'; import type { BasicStoreEntityIngestionCsv, BasicStoreEntityIngestionRss, BasicStoreEntityIngestionTaxii } from '../modules/ingestion/ingestion-types'; import { findAllTaxiiIngestions, patchTaxiiIngestion } from '../modules/ingestion/ingestion-taxii-domain'; import { TaxiiVersion } from '../generated/graphql'; -import { fetchCsvFromUrl, findAllCsvIngestions, patchCsvIngestion, testCsvIngestionMapping } from '../modules/ingestion/ingestion-csv-domain'; +import { fetchCsvFromUrl, findAllCsvIngestions, patchCsvIngestion } from '../modules/ingestion/ingestion-csv-domain'; import type { CsvMapperParsed } from '../modules/internal/csvMapper/csvMapper-types'; import { findById } from '../modules/internal/csvMapper/csvMapper-domain'; import { bundleProcess } from '../parser/csv-bundler'; @@ -108,10 +107,11 @@ const rssItemV2Convert = (turndownService: TurndownService, channel: RssElement, }; const rssHttpGetter = (): Getter => { - const httpClient = getHttpClient({ + const httpClientOptions: GetHttpClient = { responseType: 'text', headers: { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0' } - }); + }; + const httpClient = getHttpClient(httpClientOptions); return async (uri: string) => { const { data } = await httpClient.get(uri); return data; @@ -209,7 +209,7 @@ interface TaxiiResponseData { } const taxiiHttpGet = async (ingestion: BasicStoreEntityIngestionTaxii): Promise => { - const headers = new AxiosHeaders(); + const headers = new OpenCTIHeaders(); headers.Accept = 'application/taxii+json;version=2.1'; if (ingestion.authentication_type === 'basic') { const auth = Buffer.from(ingestion.authentication_value, 'utf-8').toString('base64'); @@ -285,55 +285,24 @@ const taxiiExecutor = async (context: AuthContext) => { // endregion // region Csv ingestion -interface CsvResponseData { - data: string, - addedLast: string | undefined | null -} -const csvHttpGet = async (ingestion: BasicStoreEntityIngestionCsv): Promise => { - const headers = new AxiosHeaders(); - headers.Accept = 'application/csv'; - if (ingestion.authentication_type === 'basic') { - const auth = Buffer.from(ingestion.authentication_value, 'utf-8').toString('base64'); - headers.Authorization = `Basic ${auth}`; - } - if (ingestion.authentication_type === 'bearer') { - headers.Authorization = `Bearer ${ingestion.authentication_value}`; - } - let certificates; - if (ingestion.authentication_type === 'certificate') { - const [cert, key, ca] = ingestion.authentication_value.split(':'); - certificates = { cert, key, ca }; - } - const httpClientOptions: GetHttpClient = { headers, rejectUnauthorized: false, responseType: 'json', certificates }; - const httpClient = getHttpClient(httpClientOptions); - const { data, headers: resultHeaders } = await httpClient.get(ingestion.uri); - return { data, addedLast: resultHeaders['x-csv-date-added-last'] }; -}; -const csvDataToObjects = async (data: string, ingestion: BasicStoreEntityIngestionCsv, csvMapper: CsvMapperParsed, context: AuthContext) => { - const entitiesData = data.split('\n'); - const csvBuffer = await fetchCsvFromUrl(ingestion.uri, csvMapper.skipLineChar); +const csvDataToObjects = async (csvBuffer: Buffer | string, ingestion: BasicStoreEntityIngestionCsv, csvMapper: CsvMapperParsed, context: AuthContext) => { const { objects } = await bundleProcess(context, context.user ?? SYSTEM_USER, csvBuffer, csvMapper); if (objects === undefined) { - const error = UnknownError('Undefined CSV objects', data); + const error = UnknownError('Undefined CSV objects', { data: csvBuffer.toString() }); logApp.error(error, { name: ingestion.name, context: 'CSV transform' }); + } else { + logApp.info(`[OPENCTI-MODULE] CSV ingestion execution for ${objects.length} items`); } - logApp.info(`[OPENCTI-MODULE] CSV ingestion execution for ${entitiesData.length} items`); return objects; }; const csvDataHandler = async (context: AuthContext, ingestion: BasicStoreEntityIngestionCsv) => { - const { data, addedLast } = await csvHttpGet(ingestion); const user = context.user ?? SYSTEM_USER; const csvMapper = await findById(context, user, ingestion.csv_mapper_id); const csvMapperParsed = parseCsvMapper(csvMapper); - const csvMappingTestResult = await testCsvIngestionMapping(context, user, ingestion.uri, ingestion.csv_mapper_id); - - if (!csvMappingTestResult.nbEntities) { - const error = UnknownError('Invalid data from URL', data); - logApp.error(error, { name: ingestion.name, context: 'CSV transform' }); - } - const isUnchangedData = bcrypt.compareSync(data, ingestion.current_state_hash ?? ''); + const { data, addedLast } = await fetchCsvFromUrl(csvMapperParsed, ingestion); + const isUnchangedData = bcrypt.compareSync(data.toString(), ingestion.current_state_hash ?? ''); if (isUnchangedData) { return; } @@ -353,7 +322,7 @@ const csvDataHandler = async (context: AuthContext, ingestion: BasicStoreEntityI } // Update the state - const hashedIncomingData = bcrypt.hashSync(data); + const hashedIncomingData = bcrypt.hashSync(data.toString()); await patchCsvIngestion(context, SYSTEM_USER, ingestion.internal_id, { current_state_hash: hashedIncomingData, added_after_start: utcDate(addedLast) diff --git a/opencti-platform/opencti-graphql/src/manager/publisherManager.ts b/opencti-platform/opencti-graphql/src/manager/publisherManager.ts index ecc19c9df02a..1fc20beec486 100644 --- a/opencti-platform/opencti-graphql/src/manager/publisherManager.ts +++ b/opencti-platform/opencti-graphql/src/manager/publisherManager.ts @@ -28,7 +28,7 @@ import { executionContext, SYSTEM_USER } from '../utils/access'; import { now } from '../utils/format'; import type { NotificationData } from '../utils/publisher-mock'; import { type ActivityNotificationEvent, type DigestEvent, getNotifications, type KnowledgeNotificationEvent, type NotificationUser } from './notificationManager'; -import { getHttpClient } from '../utils/http-client'; +import { type GetHttpClient, getHttpClient } from '../utils/http-client'; const DOC_URI = 'https://docs.opencti.io'; const PUBLISHER_ENGINE_KEY = conf.get('publisher_manager:lock_key'); @@ -121,8 +121,9 @@ export const internalProcessNotification = async ( const dataJson = JSON.parse(generatedWebhook); const dataHeaders = R.fromPairs((headers ?? []).map((h) => [h.attribute, h.value])); const dataParameters = R.fromPairs((params ?? []).map((h) => [h.attribute, h.value])); - const httpClient = getHttpClient({ responseType: 'json', headers: dataHeaders }); - await httpClient({ url, method: verb, params: dataParameters, data: dataJson }).catch((err) => { + const httpClientOptions: GetHttpClient = { responseType: 'json', headers: dataHeaders }; + const httpClient = getHttpClient(httpClientOptions); + await httpClient.call({ url, method: verb, params: dataParameters, data: dataJson }).catch((err) => { logApp.error(err, { manager: 'PUBLISHER_MANAGER' }); return { error: err }; }); diff --git a/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv-domain.ts b/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv-domain.ts index 58324febb958..d91cc8a1a628 100644 --- a/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv-domain.ts +++ b/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv-domain.ts @@ -1,4 +1,3 @@ -import axios from 'axios'; import type { AuthContext, AuthUser } from '../../types/user'; import { listAllEntities, listEntitiesPaginated, storeLoadById } from '../../database/middleware-loader'; import { type BasicStoreEntityIngestionCsv, ENTITY_TYPE_INGESTION_CSV } from './ingestion-types'; @@ -8,10 +7,11 @@ import type { CsvMapperTestResult, EditInput, IngestionCsvAddInput } from '../.. import { notify } from '../../database/redis'; import { BUS_TOPICS } from '../../config/conf'; import { ABSTRACT_INTERNAL_OBJECT } from '../../schema/general'; -import { type BasicStoreEntityCsvMapper, ENTITY_TYPE_CSV_MAPPER } from '../internal/csvMapper/csvMapper-types'; +import { type BasicStoreEntityCsvMapper, type CsvMapperParsed, ENTITY_TYPE_CSV_MAPPER } from '../internal/csvMapper/csvMapper-types'; import { bundleProcess } from '../../parser/csv-bundler'; import { findById as findCsvMapperById } from '../internal/csvMapper/csvMapper-domain'; import { parseCsvMapper } from '../internal/csvMapper/csvMapper-utils'; +import { type GetHttpClient, getHttpClient, OpenCTIHeaders } from '../../utils/http-client'; export const findById = (context: AuthContext, user: AuthUser, ingestionId: string) => { return storeLoadById(context, user, ingestionId, ENTITY_TYPE_INGESTION_CSV); @@ -77,35 +77,52 @@ export const deleteIngestionCsv = async (context: AuthContext, user: AuthUser, i return ingestionId; }; -export const fetchCsvFromUrl = async (url: string, csvMapperSkipLineChar: string | undefined): Promise => { - const response = await axios.get(url, { responseType: 'arraybuffer' }); - const dataExtract = response.data.toString().split('\n') - .filter((line: string) => ( - (!!csvMapperSkipLineChar && !line.startsWith(csvMapperSkipLineChar)) - || (!csvMapperSkipLineChar && !!line) - )) - .join('\n'); - return Buffer.from(dataExtract); -}; +interface CsvResponseData { + data: Buffer, + addedLast: string | undefined | null +} -export const fetchCsvExtractFromUrl = async (url: string, csvMapperSkipLineChar: string | undefined): Promise => { - const response = await axios.get(url, { responseType: 'arraybuffer' }); - const TEST_LIMIT = 50; - const dataExtract = response.data.toString().split('\n') +export const fetchCsvFromUrl = async (csvMapper: CsvMapperParsed, ingestion: BasicStoreEntityIngestionCsv, opts: { limit?: number } = {}): Promise => { + const { limit = undefined } = opts; + const headers = new OpenCTIHeaders(); + headers.Accept = 'application/csv'; + if (ingestion.authentication_type === 'basic') { + const auth = Buffer.from(ingestion.authentication_value || '', 'utf-8').toString('base64'); + headers.Authorization = `Basic ${auth}`; + } + if (ingestion.authentication_type === 'bearer') { + headers.Authorization = `Bearer ${ingestion.authentication_value}`; + } + let certificates; + if (ingestion.authentication_type === 'certificate') { + const [cert, key, ca] = (ingestion.authentication_value || '').split(':'); + certificates = { cert, key, ca }; + } + const httpClientOptions: GetHttpClient = { headers, rejectUnauthorized: false, responseType: 'arraybuffer', certificates }; + const httpClient = getHttpClient(httpClientOptions); + const { data, headers: resultHeaders } = await httpClient.get(ingestion.uri); + const dataLines = data.toString().split('\n'); + const dataExtract = dataLines .filter((line: string) => ( - (!!csvMapperSkipLineChar && !line.startsWith(csvMapperSkipLineChar)) - || (!csvMapperSkipLineChar && !!line) + (!!csvMapper.skipLineChar && !line.startsWith(csvMapper.skipLineChar)) + || (!csvMapper.skipLineChar && !!line) )) - .slice(0, TEST_LIMIT) + .slice(0, limit ?? dataLines.length) .join('\n'); - return Buffer.from(dataExtract); + return { data: Buffer.from(dataExtract), addedLast: resultHeaders['x-csv-date-added-last'] }; }; -export const testCsvIngestionMapping = async (context: AuthContext, user: AuthUser, uri: string, csv_mapper_id: string): Promise => { - const csvMapper = await findCsvMapperById(context, user, csv_mapper_id); +export const testCsvIngestionMapping = async (context: AuthContext, user: AuthUser, input: IngestionCsvAddInput): Promise => { + const csvMapper = await findCsvMapperById(context, user, input.csv_mapper_id); const parsedMapper = parseCsvMapper(csvMapper); - const csvBuffer = await fetchCsvExtractFromUrl(uri, csvMapper.skipLineChar); - const bundle = await bundleProcess(context, user, csvBuffer, parsedMapper); + const ingestion = { + csv_mapper_id: input.csv_mapper_id, + uri: input.uri, + authentication_type: input.authentication_type, + authentication_value: input.authentication_value + } as BasicStoreEntityIngestionCsv; + const { data } = await fetchCsvFromUrl(parsedMapper, ingestion, { limit: 50 }); + const bundle = await bundleProcess(context, user, data, parsedMapper); return { objects: JSON.stringify(bundle.objects, null, 2), nbRelationships: bundle.objects.filter((object) => object.type === 'relationship').length, diff --git a/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv-resolver.ts b/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv-resolver.ts index ce2c1d09ebda..f032bd428dc9 100644 --- a/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv-resolver.ts +++ b/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv-resolver.ts @@ -17,13 +17,15 @@ const ingestionCsvResolvers: Resolvers = { Query: { ingestionCsv: (_, { id }, context) => findById(context, context.user, id), ingestionCsvs: (_, args, context) => findAllPaginated(context, context.user, args), - test_mapper: (_, { uri, csv_mapper_id }, context) => testCsvIngestionMapping(context, context.user, uri, csv_mapper_id), }, IngestionCsv: { user: (ingestionCsv, _, context) => creatorLoader.load(ingestionCsv.user_id, context, context.user), csvMapper: (ingestionCsv, _, context) => findCsvMapperForIngestionById(context, context.user, ingestionCsv.csv_mapper_id), }, Mutation: { + ingestionCsvTester: (_, { input }, context) => { + return testCsvIngestionMapping(context, context.user, input); + }, ingestionCsvAdd: (_, { input }, context) => { return addIngestionCsv(context, context.user, input); }, diff --git a/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv.graphql b/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv.graphql index 6a8e68f65cbd..dcb2ca3510ff 100644 --- a/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv.graphql +++ b/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-csv.graphql @@ -55,10 +55,6 @@ type Query { includeAuthorities: Boolean search: String ): IngestionCsvConnection @auth(for: [SETTINGS]) - test_mapper( - uri: String! - csv_mapper_id: String! - ): CsvMapperTestResult @auth(for: [SETTINGS]) } # Mutations @@ -75,6 +71,7 @@ input IngestionCsvAddInput { } type Mutation { + ingestionCsvTester(input: IngestionCsvAddInput!): CsvMapperTestResult @auth(for: [SETTINGS]) ingestionCsvAdd(input: IngestionCsvAddInput!): IngestionCsv @auth(for: [SETTINGS]) ingestionCsvDelete(id: ID!): ID @auth(for: [SETTINGS]) ingestionCsvFieldPatch(id: ID!, input: [EditInput!]!): IngestionCsv @auth(for: [SETTINGS]) diff --git a/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-taxii.graphql b/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-taxii.graphql index 1aae5ac1a41d..b6da91aa483e 100644 --- a/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-taxii.graphql +++ b/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-taxii.graphql @@ -27,6 +27,7 @@ type IngestionTaxii implements InternalObject & BasicObject { authentication_value: String user_id: String user: Creator + current_state_cursor: String added_after_start: DateTime ingestion_running: Boolean } diff --git a/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-types.ts b/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-types.ts index 1f897bb686b6..7483ff748010 100644 --- a/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-types.ts +++ b/opencti-platform/opencti-graphql/src/modules/ingestion/ingestion-types.ts @@ -84,7 +84,7 @@ export interface BasicStoreEntityIngestionCsv extends BasicStoreEntity { csvMapper: CsvMapper csv_mapper_id: string authentication_type: 'none' | 'basic' | 'bearer' | 'certificate' - authentication_value: string + authentication_value?: string | null user_id: string | undefined ingestion_running: boolean } diff --git a/opencti-platform/opencti-graphql/src/utils/http-client.ts b/opencti-platform/opencti-graphql/src/utils/http-client.ts index b0134fb42790..d6a771e8aa7e 100644 --- a/opencti-platform/opencti-graphql/src/utils/http-client.ts +++ b/opencti-platform/opencti-graphql/src/utils/http-client.ts @@ -1,8 +1,11 @@ import https from 'node:https'; -import axios, { type AxiosHeaders, type HeadersDefaults, type RawAxiosRequestHeaders } from 'axios'; -import { getPlatformHttpProxies } from '../config/conf'; +import axios, { AxiosHeaders, type AxiosRequestConfig, type HeadersDefaults, type RawAxiosRequestHeaders } from 'axios'; +import { Agent } from 'https'; +import { getPlatformHttpProxyAgent } from '../config/conf'; import { fromBase64, isNotEmptyField } from '../database/utils'; +export class OpenCTIHeaders extends AxiosHeaders {} + export interface Certificates { cert: string, key: string, @@ -19,20 +22,33 @@ export interface GetHttpClient { password: string } } + +const buildHttpAgentOpts = (uri: string, baseURL: string | undefined, defaultHttpsAgent: Agent) => { + const agentUri = baseURL ? `${baseURL}${uri}` : uri; + return { + httpAgent: getPlatformHttpProxyAgent(agentUri), + httpsAgent: getPlatformHttpProxyAgent(agentUri) ?? defaultHttpsAgent, + proxy: false // Disable direct proxy protocol in http adapter + }; +}; export const getHttpClient = ({ baseURL, headers, rejectUnauthorized, responseType, certificates, auth }: GetHttpClient) => { - const proxies = getPlatformHttpProxies(); + // Build a default https agent to force query options if no proxy is setup const cert = isNotEmptyField(certificates?.cert) ? fromBase64(certificates?.cert) : undefined; const key = isNotEmptyField(certificates?.key) ? fromBase64(certificates?.key) : undefined; const ca = isNotEmptyField(certificates?.ca) ? fromBase64(certificates?.ca) : undefined; const defaultHttpsAgent = new https.Agent({ rejectUnauthorized: rejectUnauthorized === true, cert, key, ca }); - return axios.create({ + // Create the default caller + const caller = axios.create({ baseURL, responseType, headers, auth, withCredentials: true, - httpAgent: proxies['http:']?.build(), - httpsAgent: proxies['https:']?.build() ?? defaultHttpsAgent, - proxy: false // Disable direct proxy protocol in axios http adapter }); + // Override methods to setup correct http agents + return { + call: (config: AxiosRequestConfig) => caller(config), + get: async (url: string, opts: any = {}) => caller.get(url, { ...opts, ...buildHttpAgentOpts(url, baseURL, defaultHttpsAgent) }), + post: async (url: string, data: object, opts: any = {}) => caller.post(url, data, { ...opts, ...buildHttpAgentOpts(url, baseURL, defaultHttpsAgent) }) + }; };