Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dragos]: feat implement api client #3505

Open
wants to merge 22 commits into
base: feat/2570-dragos-create-the-connector
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
164d1aa
deps: add needed dependencies
flavienSindou Feb 26, 2025
1242f9b
feat: implement tooling
flavienSindou Feb 26, 2025
77ac347
feat: implement Base V1 API client
flavienSindou Feb 26, 2025
cd81594
feat: implement indicator and product endpoints
flavienSindou Feb 26, 2025
4b8cb79
feat: wrap endpoint in global V1 Client
flavienSindou Feb 26, 2025
4efb3d7
doc: add README and example
flavienSindou Feb 26, 2025
d02fd15
tests: add unit tests
flavienSindou Feb 26, 2025
a79c434
fix: typing error
flavienSindou Feb 26, 2025
c989158
fix: remove dev tests
flavienSindou Feb 26, 2025
3147158
fix: remove duplicated import
flavienSindou Feb 26, 2025
77149f6
chore: style
flavienSindou Feb 26, 2025
5eb4956
refacto: use ResponseModel alias
flavienSindou Feb 27, 2025
beb708f
fix: typing issue
flavienSindou Feb 27, 2025
6bf7d93
feat: add dev fake api server
flavienSindou Feb 27, 2025
95bd1df
doc: update readme and provide screen shots
flavienSindou Feb 27, 2025
b191e2f
deps: add needed dev requirements
flavienSindou Feb 28, 2025
9bf8fe2
feat: add check if product exists before generating fake PDF
flavienSindou Feb 28, 2025
20bf5af
fix: https://github.com/OpenCTI-Platform/connectors/pull/3505#discuss…
flavienSindou Mar 6, 2025
6212070
doc: https://github.com/OpenCTI-Platform/connectors/pull/3505#discuss…
flavienSindou Mar 6, 2025
ec91003
chore: https://github.com/OpenCTI-Platform/connectors/pull/3505#discu…
flavienSindou Mar 6, 2025
57b411b
style
flavienSindou Mar 6, 2025
c13b64e
style
flavienSindou Mar 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions external-import/dragos/client_api/README.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crystal clear and the examples work fine 👍

Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# client_api Package

The `client_api` package provides tools and clients to interact with the Dragos Worldview API. It includes modules for handling common functionalities, errors, warnings, and specific API endpoints such as indicators and products.

## Reference

- Worldview V1 API documentation [consulted on 2025-02-26]: https://portal.dragos.com/api/v1/doc/index.html

## Quick start

To use the clients provided by this package, you need to initialize them with the appropriate parameters such as `base_url`, `token`, `secret`, `timeout`, `retry`, and `backoff`. The clients offer methods to make requests to the API and handle the responses.

Example:

```python
from datetime import datetime, timedelta, timezone
from yarl import URL
from pydantic import SecretStr

from client_api.v1 import DragosClientAPIV1
from client_api.error import DragosAPIError

client = DragosClientAPIV1(
base_url=URL("https://portal.dragos.com"),
token=SecretStr("ChangeMe"),
secret=SecretStr("ChangeMe"),
timeout=timedelta(seconds=10),
retry=3,
backoff=timedelta(seconds=5),
)
async def last_day():
# Note: this assumes no errors are raised in the request in iter_indicators.
async for indicator in client.indicator.iter_indicators(
updated_after=datetime.now(timezone.utc) - timedelta(days=1)
):
# Complex logic here
pass

asyncio.run(last_day())
```


## Dev

A dev fake server Api is provided to test the client. It is a simple FastAPI server that simulates the Dragos API. It is used for testing the client.

To use it you need to install the project with the `dev`extra:

```bash
pip install -e .[dev]
```

### Data
The fake server uses a simple json file to store the data. The data should be stored in the `client_api/dev/fake_server/data` directory in `products.json`and `indicator.json` files.

Lucky Filigran Developper can find a complete example on connector development Notion Page (under Usefull Resource section).

### Run

The fake server can be run with the following command:

```bash
cd client_api/dev/fake_server
python -m uvicorn main:app --port 4000
```
Then you can find the Base URL in the terminal output (here http://127.0.0.1:4000).
```
INFO: Started server process [15748]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:4000 (Press CTRL+C to quit)
INFO: 127.0.0.1:65483 - "GET /api/v1/products/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:49952 - "GET /api/v1/products/DOM-2024-08 HTTP/1.1" 200 OK
```

An interactive documentation is available at http://<base_url>/api/v1/docs

### Authentication

You must use a header with the values

API-Token: dev
API-Secret: dev

Not providing this can be useful to test application reaction to a 401 error response.

### Results

see [docs](./dev/docs/)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want to display images here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to provide a link to keep README readable


## Modules

### error
This module defines custom exceptions for handling errors related to the Dragos API.

### warning
This module provides custom warnings and tools for handling validation warnings in the API responses.

### v1
This subpackage contains modules for interacting with version 1 of the Dragos Worldview API. It includes clients and response models for specific API endpoints such as indicators and products.

#### indicator
This module provides the client and response models for the Dragos Worldview API indicator endpoint. It includes classes for handling indicator responses and making requests to the indicator API.

#### product
This module provides the client and response models for the Dragos Worldview API product endpoint. It includes classes for handling product responses and making requests to the product API.
1 change: 1 addition & 0 deletions external-import/dragos/client_api/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Offer common tools for the Dragos APIs."""
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
36 changes: 36 additions & 0 deletions external-import/dragos/client_api/dev/fake_server/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# isort:skip_file
"""Offer fake server for the Dragos API V1 endpoints."""

from typing import Awaitable, Callable

from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware

from client_api.dev.fake_server.v1.indicators import router as indicators_router
from client_api.dev.fake_server.v1.product import router as product_router


class V1AuthMiddleware(BaseHTTPMiddleware):
"""Define Middleware to authenticate requests to /api/v1/* endpoints."""

async def dispatch(
self, request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response | JSONResponse:
"""Dispatch method for the middleware."""
api_token = request.headers.get("API-Token")
api_secret = request.headers.get("API-Secret")

if api_token != "dev" or api_secret != "dev": # noqa: S105
return JSONResponse(status_code=401, content={"detail": "Unauthorized"})

return await call_next(request)


v1_app = FastAPI()
v1_app.include_router(product_router)
v1_app.include_router(indicators_router)

app = FastAPI()
app.mount("/api/v1", v1_app)
app.add_middleware(V1AuthMiddleware)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[
{
"id": 1234,
"value": "example.com",
"indicator_type": "domain",
"category": null,
"comment": null,
"first_seen": "2023-01-30T08:03:31.000Z",
"last_seen": "2024-07-17T16:35:15.000Z",
"updated_at": "2024-07-17T16:47:01.000Z",
"confidence": "moderate",
"kill_chain": null,
"uuid": "19e48e39-68f8-46cf-99d0-cdc46c60cf65",
"status": "released",
"severity": null,
"attack_techniques": [
"Command and Control:Dynamic Resolution",
"Resource Development:Acquire Infrastructure"
],
"ics_attack_techniques": [],
"kill_chains": [],
"pre_attack_techniques": [],
"threat_groups": [],
"products": [
{
"serial": "DEMO_SERIAL"
}
]
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"tlp_level": "AMBER",
"title": "Example Title",
"executive_summary": "Complete summary",
"updated_at": "2024-05-15T18:09:57.000Z",
"threat_level": 3,
"serial": "DEMO_SERIAL",
"ioc_count": 1,
"tags": [
{
"text": "Energy",
"tag_type": "Industry"
},
{
"text": "Europe",
"tag_type": "GeographicLocation"
}
],
"release_date": "2024-03-01T01:31:09.000Z",
"type": "Suspect Domain Report",
"report_link": "http://example.com/api/v1/products/DEMO_SERIAL/report",
"ioc_csv_link": "http://example.com/api/v1/products/DEMO_SERIAL/csv",
"ioc_stix2_link": "http://example.com/api/v1/products/DEMO_SERIAL/stix2"
}
]
77 changes: 77 additions & 0 deletions external-import/dragos/client_api/dev/fake_server/v1/indicators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# isort:skip
"""Offer fake api/v1/indicators endpoint router."""

import json
from pathlib import Path
from typing import List, Optional

from fastapi import APIRouter, Query
from fastapi.responses import JSONResponse

router = APIRouter(prefix="/indicators", tags=["Indicators"])

indicators_json_path = Path(__file__).parent.resolve() / "data" / "indicators.json"


@router.get("/")
async def get_indicators(
exclude_suspect_domain: bool = Query(
False,
description="Exclude indicators that are only associated with Suspect Domain Reports",
),
page: int = Query(1, description="Page number"),
page_size: int = Query(500, description="Page size", le=1000),
updated_after: Optional[str] = Query(None, description="Filter by update date"),
value: Optional[str] = Query(None, description="Filter by value"),
type: Optional[str] = Query(None, description="Filter by type"),
serial: Optional[List[str]] = Query( # noqa: B008
None, description="Filter by serials"
),
tags: Optional[List[str]] = Query(None, description="Filter by tags"), # noqa: B008
) -> JSONResponse:
"""Get indicators."""
# load indicators from /data/indicators.json
with open(indicators_json_path, "r", encoding="utf8") as f:
indicators = json.load(f)

# filter indicators
if exclude_suspect_domain:
indicators = [i for i in indicators if not i.get("suspect_domain")]
if updated_after:
indicators = [i for i in indicators if i["updated_at"] > updated_after]
if value:
indicators = [i for i in indicators if value in i["value"]]
if type:
indicators = [i for i in indicators if i["type"] == type]
if serial:
indicators = [i for i in indicators if i["serial"] in serial]
if tags:
indicators = [i for i in indicators if any(tag in i["tags"] for tag in tags)]

# paginate indicators
total = len(indicators)
total_pages = (total + page_size - 1) // page_size
indicators = indicators[(page - 1) * page_size : page * page_size]

return JSONResponse(
{
"indicators": indicators,
"total": total,
"page_size": page_size,
"total_pages": total_pages,
"page": page,
},
status_code=200,
)


@router.get(".stix2")
async def get_indicators_dot_stix2() -> JSONResponse:
"""Get indicators stix2."""
raise NotImplementedError


@router.get("/stix2")
async def get_indicators_stix2() -> JSONResponse:
"""Get indicators stix2."""
raise NotImplementedError
Loading