Skip to content

Commit d4d74f6

Browse files
Ajout de données depuis SeaMIS par fichiers CSV (#145)
* Work on CSV files * Create and describe aggregates * Ajout tables de codes * Check mapping completeness * Work on unique codes * Download remote FTP folder * Keep latest version * Improve operations to keep * Work on DAG download_secmar_csv_ftp * Add other tasks * Add process_all_days * Work on replacement mapping * Add create_cleaned_aggregate_files * Resolve path * Add base Embulk files * Base Embulk import * Fix filepath * Clean timestamps * Clean force fields * Improve datetimes parsing * Improve drop_duplicates for operations * Converssion radians en degrés pour le vent * Add pourquoi_alerte in secmar_operation_id * Améliore les codes * Travaille sur les colonnes cross_sitrep * Rework Embulk order * Rework cross_sitrep * Update checks with new cross_sitrep format * Add 3 new cols in operations * Rework est_metropolitain * Work on insert SQL scripts * Fix duration computation * Rewrite run SQL task * Start to write checks * Add missing dependency * Download next day if available * Replace ShortCircuit with BranchOperator * Move to main DAG * Update inset for secmar_csv_operations * Mappe Polynésie française en Polynésie * Replace more NA fields for the map * Changement d'organisation
1 parent 0bf9522 commit d4d74f6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1372
-38
lines changed

.circleci/config.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ jobs:
5959
- ~/.cache/yarn
6060
- run:
6161
name: Pull CHANGELOG
62-
command: wget https://raw.githubusercontent.com/MTES-MCT/secmar-data/master/CHANGELOG.md -O doc/CHANGELOG.md
62+
command: wget https://raw.githubusercontent.com/snosan-tools/secmar-data/master/CHANGELOG.md -O doc/CHANGELOG.md
6363
- run:
6464
name: Build Markdown documentation for opendata
6565
command: |

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,5 @@ app/node_modules
88
app/platforms
99
app/plugins
1010
app/resources/signing
11+
12+
snosan_csv

airflow/.python-version

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.5.2
+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
# download_secmar_csv_ftp
4+
This DAG download CSV files from the remote FTP server.
5+
"""
6+
import os
7+
from datetime import datetime
8+
from pathlib import Path
9+
10+
import helpers
11+
from airflow import DAG
12+
from airflow.models import Variable
13+
from airflow.operators.dummy_operator import DummyOperator
14+
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
15+
from transformers import secmar_csv
16+
17+
default_args = helpers.default_args({"start_date": datetime(2022, 6, 22, 10, 0)})
18+
19+
dag = DAG(
20+
"download_secmar_csv_ftp",
21+
default_args=default_args,
22+
max_active_runs=1,
23+
concurrency=1,
24+
catchup=True,
25+
schedule_interval="0 10 * * *",
26+
)
27+
dag.doc_md = __doc__
28+
29+
30+
def setup_ftp_env():
31+
os.environ["FTP_PROXY"] = "false"
32+
os.environ["FTP_HOST"] = Variable.get("SECMAR_CSV_FTP_HOST")
33+
os.environ["FTP_USER"] = Variable.get("SECMAR_CSV_FTP_USER")
34+
os.environ["FTP_PASSWORD"] = Variable.get("SECMAR_CSV_FTP_PASSWORD")
35+
36+
37+
def ftp_download_fn(**kwargs):
38+
setup_ftp_env()
39+
secmar_csv.ftp_download_remote_folder(kwargs["templates_dict"]["day"])
40+
41+
42+
def check_if_next_day_exists_fn(**kwargs):
43+
setup_ftp_env()
44+
if secmar_csv.day_exists_in_remote_ftp(kwargs["templates_dict"]["day"]):
45+
return "download_next_day"
46+
return "process_all_days"
47+
48+
49+
def embulk_import(dag, table):
50+
script = "secmar_csv_%s" % table
51+
filepath = str(secmar_csv.AGGREGATE_FOLDER / (table + ".cleaned.csv"))
52+
return helpers.embulk_run(dag, script, {"EMBULK_FILEPATH": filepath})
53+
54+
55+
download_single_day = PythonOperator(
56+
task_id="download_single_day",
57+
python_callable=ftp_download_fn,
58+
provide_context=True,
59+
dag=dag,
60+
templates_dict={"day": "{{ ds_nodash }}"},
61+
)
62+
63+
check_if_next_day_exists = BranchPythonOperator(
64+
task_id="check_if_next_day_exists",
65+
python_callable=check_if_next_day_exists_fn,
66+
provide_context=True,
67+
dag=dag,
68+
templates_dict={"day": "{{ tomorrow_ds_nodash }}"},
69+
)
70+
check_if_next_day_exists.set_upstream(download_single_day)
71+
72+
download_next_day = PythonOperator(
73+
task_id="download_next_day",
74+
python_callable=ftp_download_fn,
75+
provide_context=True,
76+
dag=dag,
77+
templates_dict={"day": "{{ tomorrow_ds_nodash }}"},
78+
)
79+
download_next_day.set_upstream(check_if_next_day_exists)
80+
81+
process_all_days = PythonOperator(
82+
task_id="process_all_days",
83+
python_callable=secmar_csv.process_all_days,
84+
dag=dag,
85+
trigger_rule="all_done",
86+
)
87+
process_all_days.set_upstream(download_single_day)
88+
process_all_days.set_upstream(download_next_day)
89+
90+
build_aggregate_files = PythonOperator(
91+
task_id="build_aggregate_files",
92+
python_callable=secmar_csv.build_aggregate_files,
93+
dag=dag,
94+
)
95+
build_aggregate_files.set_upstream(process_all_days)
96+
97+
check_mapping_data = PythonOperator(
98+
task_id="check_mapping_data",
99+
python_callable=secmar_csv.check_mapping_data,
100+
dag=dag,
101+
)
102+
check_mapping_data.set_upstream(build_aggregate_files)
103+
104+
create_cleaned_aggregate_files = PythonOperator(
105+
task_id="create_cleaned_aggregate_files",
106+
python_callable=secmar_csv.create_cleaned_aggregate_files,
107+
dag=dag,
108+
)
109+
create_cleaned_aggregate_files.set_upstream(check_mapping_data)
110+
111+
start_embulk = DummyOperator(task_id="start_embulk", dag=dag)
112+
end_embulk = DummyOperator(task_id="end_embulk", dag=dag)
113+
start_embulk.set_upstream(create_cleaned_aggregate_files)
114+
115+
operation_embulk = embulk_import(dag, "operation")
116+
operation_embulk.set_upstream(start_embulk)
117+
operation_embulk.set_downstream(end_embulk)
118+
119+
for table in [
120+
Path(f).stem for f in secmar_csv.EXPECTED_FILENAMES if not f.startswith("operation")
121+
]:
122+
t = embulk_import(dag, table)
123+
t.set_upstream(operation_embulk)
124+
t.set_downstream(end_embulk)

airflow/dags/extract_secmar.py

+59-13
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,23 @@
77
"""
88
from datetime import datetime
99

10+
import helpers
1011
from airflow import DAG
11-
from airflow.operators.python_operator import PythonOperator
12+
from airflow.hooks.postgres_hook import PostgresHook
1213
from airflow.operators.bash_operator import BashOperator
13-
from airflow.operators.dummy_operator import DummyOperator
1414
from airflow.operators.check_operator import CheckOperator
15-
from airflow.hooks.postgres_hook import PostgresHook
16-
from operators.pg_download_operator import PgDownloadOperator
1715
from airflow.operators.dagrun_operator import TriggerDagRunOperator
18-
19-
20-
import helpers
21-
from secmar_dags import in_path, out_path, secmar_transform
22-
from secmar_dags import SECMAR_TABLES, secmar_transformer
23-
from secmar_checks import checks
16+
from airflow.operators.dummy_operator import DummyOperator
17+
from airflow.operators.python_operator import PythonOperator
18+
from operators.pg_download_operator import PgDownloadOperator
19+
from secmar_checks import checks, secmar_csv_checks
20+
from secmar_dags import (
21+
SECMAR_TABLES,
22+
in_path,
23+
out_path,
24+
secmar_transform,
25+
secmar_transformer,
26+
)
2427

2528
default_args = helpers.default_args({"start_date": datetime(2018, 4, 27, 5, 40)})
2629

@@ -89,6 +92,22 @@ def execute_sql_file(filename):
8992
return PostgresHook("postgresql_local").run(content)
9093

9194

95+
def _execute_secmar_csv_sql_file(filename):
96+
path = helpers.secmar_csv_sql_path(filename)
97+
with open(path, "r", encoding="utf-8") as f:
98+
content = f.read()
99+
return PostgresHook("postgresql_local").run(content)
100+
101+
102+
def secmar_csv_sql_task(dag, filename):
103+
return PythonOperator(
104+
task_id="run_" + filename,
105+
python_callable=lambda **kwargs: _execute_secmar_csv_sql_file(filename),
106+
provide_context=True,
107+
dag=dag,
108+
)
109+
110+
92111
def make_distance_fn(distance):
93112
return lambda **kwargs: execute_sql_file(distance)
94113

@@ -147,6 +166,33 @@ def embulk_import(dag, table):
147166
)
148167
delete_invalid_operations.set_upstream(end_import)
149168

169+
# Insert data fetched by FTP, "secmar_csv"
170+
start_secmar_csv_insert = DummyOperator(task_id="start_secmar_csv_insert", dag=dag)
171+
end_secmar_csv_insert = DummyOperator(task_id="end_secmar_csv_insert", dag=dag)
172+
start_secmar_csv_insert.set_upstream(delete_invalid_operations)
173+
174+
insert_operations = secmar_csv_sql_task(dag, "insert_operations")
175+
insert_operations.set_upstream(start_secmar_csv_insert)
176+
177+
for table in ["flotteurs", "resultats_humain", "moyens"]:
178+
t = secmar_csv_sql_task(dag, "insert_{table}".format(table=table))
179+
t.set_upstream(insert_operations)
180+
t.set_downstream(end_secmar_csv_insert)
181+
182+
start_secmar_csv_checks = DummyOperator(task_id="start_secmar_csv_checks", dag=dag)
183+
end_secmar_csv_checks = DummyOperator(task_id="end_secmar_csv_checks", dag=dag)
184+
start_secmar_csv_checks.set_upstream(end_secmar_csv_insert)
185+
186+
for check_name, query in secmar_csv_checks().items():
187+
t = CheckOperator(
188+
task_id="check_consistency_" + check_name,
189+
sql=query,
190+
conn_id="postgresql_local",
191+
dag=dag,
192+
)
193+
t.set_upstream(start_secmar_csv_checks)
194+
t.set_downstream(end_secmar_csv_checks)
195+
150196
insert_operations_stats = PythonOperator(
151197
task_id="insert_operations_stats",
152198
python_callable=insert_operations_stats_fn,
@@ -161,15 +207,15 @@ def embulk_import(dag, table):
161207
provide_context=True,
162208
dag=dag,
163209
)
164-
prepare_operations_points.set_upstream(delete_invalid_operations)
210+
prepare_operations_points.set_upstream(end_secmar_csv_checks)
165211

166212
insert_moyens_snsm = PythonOperator(
167213
task_id="insert_moyens_snsm",
168214
python_callable=insert_moyens_snsm_fn,
169215
provide_context=True,
170216
dag=dag,
171217
)
172-
insert_moyens_snsm.set_upstream(delete_invalid_operations)
218+
insert_moyens_snsm.set_upstream(end_secmar_csv_checks)
173219
insert_moyens_snsm.set_downstream(start_checks)
174220

175221
distances = [
@@ -213,7 +259,7 @@ def embulk_import(dag, table):
213259
csv_params={"sep": ",", "index": False},
214260
dag=dag,
215261
)
216-
download_operations_local_time.set_upstream(delete_invalid_operations)
262+
download_operations_local_time.set_upstream(end_secmar_csv_checks)
217263

218264
transform_operations_stats = PythonOperator(
219265
task_id="transform_operations_stats",

airflow/dags/helpers.py

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ def opendata_sql_path(filename):
1616
)
1717

1818

19+
def secmar_csv_sql_path(filename):
20+
return "{base}/opendata_sql/secmar_csv/{filename}.sql".format(
21+
base=base_path(), filename=filename
22+
)
23+
24+
1925
def data_path(filename):
2026
return "{base}/data/{filename}".format(base=base_path(), filename=filename)
2127

airflow/dags/secmar_checks.py

+34-4
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,10 @@ def checks():
126126
date_heure_reception_alerte
127127
from operations as op
128128
join (
129-
select 'Corsen 2017/1305' cross_sitrep ,'2017-12-16 11:57:00+00' expected_time UNION
130-
select 'Corsen 2018/1503' cross_sitrep ,'2018-10-13 08:13:00+00' expected_time UNION
131-
select 'Étel 2018/3473' cross_sitrep ,'2018-12-20 08:51:00+00' expected_time UNION
132-
select 'Corsen 2019/2604' cross_sitrep ,'2019-12-20 13:26:00+00' expected_time
129+
select 'Corsen SAR 2017/1305' cross_sitrep ,'2017-12-16 11:57:00+00' expected_time UNION
130+
select 'Corsen SAR 2018/1503' cross_sitrep ,'2018-10-13 08:13:00+00' expected_time UNION
131+
select 'Étel SAR 2018/3473' cross_sitrep ,'2018-12-20 08:51:00+00' expected_time UNION
132+
select 'Corsen SAR 2019/2604' cross_sitrep ,'2019-12-20 13:26:00+00' expected_time
133133
) t on t.cross_sitrep = op.cross_sitrep and op.date_heure_reception_alerte::text = t.expected_time
134134
) t
135135
""",
@@ -140,3 +140,33 @@ def checks():
140140
where concerne_snosan and avec_clandestins
141141
""",
142142
}
143+
144+
145+
def secmar_csv_checks():
146+
return {
147+
"operations_count_2021": """
148+
select count(1) between 16800 and 16820
149+
from operations
150+
where extract(year from date_heure_reception_alerte) = 2021
151+
""",
152+
"operations_count_up_to_2021": """
153+
select count(1) between 321500 and 321600
154+
from operations
155+
where extract(year from date_heure_reception_alerte) <= 2021
156+
""",
157+
"operations_count_cross_2021": """
158+
select count(distinct "cross") = 11
159+
from operations
160+
where extract(year from date_heure_reception_alerte) = 2021
161+
""",
162+
"operations_count_2021_from_secmar_csv": """
163+
select count(1) between 14670 and 14680
164+
from operations
165+
where extract(year from date_heure_reception_alerte) = 2021 and operation_id in (select secmar_operation_id from secmar_csv_operation)
166+
""",
167+
"est_metropolitain": """
168+
select string_agg(distinct "cross"::varchar, '|' order by "cross"::varchar) = 'Antilles-Guyane|Gris-Nez|Guadeloupe|Guyane|La Réunion|Martinique|Mayotte|Nouvelle-Calédonie|Polynésie'
169+
from operations
170+
where not est_metropolitain
171+
""",
172+
}

embulk/secmar_csv_bilan.yml.liquid

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
in:
2+
type: file
3+
path_prefix: {{ env.EMBULK_FILEPATH }}
4+
parser:
5+
charset: UTF-8
6+
newline: LF
7+
type: csv
8+
delimiter: ','
9+
quote: '"'
10+
escape: '"'
11+
trim_if_not_quoted: false
12+
skip_header_lines: 1
13+
allow_extra_columns: false
14+
allow_optional_columns: false
15+
columns:
16+
- {name: operation_id, type: string}
17+
- {name: SEC_RESULTAT_HUMAIN_resultat_humain_id, type: string}
18+
- {name: SEC_RESULTAT_HUMAIN_cat_personne_id, type: string}
19+
- {name: SEC_RESULTAT_HUMAIN_nb, type: long}
20+
- {name: SEC_RESULTAT_HUMAIN_dont_nb_blesse, type: long}
21+
- {name: operation_long_name, type: string}
22+
- {name: operation_version, type: long}
23+
- {name: secmar_operation_id, type: long}
24+
{% include 'datasources/out_postgresql' %}
25+
table: secmar_csv_bilan
26+
mode: truncate_insert

embulk/secmar_csv_flotteur.yml.liquid

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
in:
2+
type: file
3+
path_prefix: {{ env.EMBULK_FILEPATH }}
4+
parser:
5+
charset: UTF-8
6+
newline: LF
7+
type: csv
8+
delimiter: ','
9+
quote: '"'
10+
escape: '"'
11+
trim_if_not_quoted: false
12+
skip_header_lines: 1
13+
allow_extra_columns: false
14+
allow_optional_columns: false
15+
columns:
16+
- {name: operation_id, type: string}
17+
- {name: SEC_FLOTTEUR_IMPLIQUE_mer_force, type: long}
18+
- {name: SEC_FLOTTEUR_IMPLIQUE_type_flotteur_id, type: string}
19+
- {name: SEC_FLOTTEUR_IMPLIQUE_pavillon_id, type: string}
20+
- {name: SEC_FLOTTEUR_IMPLIQUE_num_immat_fr, type: string}
21+
- {name: SEC_FLOTTEUR_IMPLIQUE_num_imo, type: double}
22+
- {name: SEC_FLOTTEUR_IMPLIQUE_nom, type: string}
23+
- {name: SEC_FLOTTEUR_IMPLIQUE_resultat_flotteur_id, type: string}
24+
- {name: operation_long_name, type: string}
25+
- {name: operation_version, type: long}
26+
- {name: secmar_operation_id, type: long}
27+
{% include 'datasources/out_postgresql' %}
28+
table: secmar_csv_flotteur
29+
mode: truncate_insert

0 commit comments

Comments
 (0)