@@ -1125,11 +1125,17 @@ def poi2mimir(self, instance_name, input, autocomplete_version, job_id=None, dat
1125
1125
@celery .task (bind = True )
1126
1126
def fusio2s3 (self , instance_config , filename , job_id , dataset_uid ):
1127
1127
"""Zip fusio file and launch fusio2s3"""
1128
- filename = enrich_ntfs_with_addresses ("fusio" , instance_config , filename , job_id , dataset_uid )
1128
+
1129
+ root_dir = os .path .dirname (filename )
1130
+ loki_dir = os .path .join (root_dir , "for_loki" )
1131
+ os .makedirs (loki_dir , 0o755 )
1132
+
1133
+ filename = enrich_ntfs_with_addresses ("fusio" , instance_config , loki_dir , filename , job_id , dataset_uid )
1134
+ filename = split_trip_geometries (loki_dir , filename , job_id , dataset_uid )
1129
1135
_inner_2s3 (self , "fusio" , instance_config , filename , job_id , dataset_uid )
1130
1136
1131
1137
1132
- def enrich_ntfs_with_addresses (dataset_type , instance_config , filename , job_id , dataset_uid ):
1138
+ def enrich_ntfs_with_addresses (dataset_type , instance_config , loki_dir , filename , job_id , dataset_uid ):
1133
1139
"""launch enrich-ntfs-with-addresses"""
1134
1140
1135
1141
job = models .Job .query .get (job_id )
@@ -1139,13 +1145,8 @@ def enrich_ntfs_with_addresses(dataset_type, instance_config, filename, job_id,
1139
1145
logger = get_instance_logger (instance , task_id = job_id )
1140
1146
filename = zip_if_needed (filename )
1141
1147
1142
- file_dir = os .path .dirname (filename )
1143
- file_basename = os .path .basename (filename )
1144
- output_dir = file_dir + "/for_loki"
1145
- os .makedirs (output_dir , 0o755 )
1146
- output = output_dir + "/" + file_basename
1147
-
1148
- previous_ntfs_path = output_dir + "/previous_ntfs.zip"
1148
+ output = os .path .join (loki_dir , "with_addresses.zip" )
1149
+ previous_ntfs_path = os .path .join (loki_dir , "/previous_ntfs.zip" )
1149
1150
1150
1151
file_key = "{coverage}/{dataset_type}.zip" .format (coverage = instance_config .name , dataset_type = dataset_type )
1151
1152
@@ -1171,11 +1172,50 @@ def enrich_ntfs_with_addresses(dataset_type, instance_config, filename, job_id,
1171
1172
if use_previous_ntfs :
1172
1173
params .extend (["--previous-ntfs" , previous_ntfs_path ])
1173
1174
1175
+ binary = "enrich-ntfs-with-addresses"
1176
+
1177
+ res = None
1178
+ with collect_metric (binary , job , dataset_uid ):
1179
+ res = launch_exec (binary , params , logger )
1180
+ if res != 0 :
1181
+ raise ValueError ("{} failed" .format (binary ))
1182
+ except :
1183
+ logger .exception ("" )
1184
+ job .state = "failed"
1185
+ dataset .state = "failed"
1186
+ raise
1187
+ finally :
1188
+ models .db .session .commit ()
1189
+
1190
+ return output
1191
+
1192
+
1193
+ def split_trip_geometries (loki_dir , filename , job_id , dataset_uid ):
1194
+ """launch split-trip-geometries"""
1195
+
1196
+ job = models .Job .query .get (job_id )
1197
+ dataset = _retrieve_dataset_and_set_state ("fusio" , job .id )
1198
+ instance = job .instance
1199
+
1200
+ logger = get_instance_logger (instance , task_id = job_id )
1201
+ filename = zip_if_needed (filename )
1202
+
1203
+ output = os .path .join (loki_dir , "with_split_trip_geometries.zip" )
1204
+
1205
+ try :
1206
+ params = [
1207
+ "--input" ,
1208
+ filename ,
1209
+ "--output" ,
1210
+ output ,
1211
+ ]
1212
+
1213
+ binary = "split-trip-geometries"
1174
1214
res = None
1175
- with collect_metric ("enrich-ntfs-with-addresses" , job , dataset_uid ):
1176
- res = launch_exec ("enrich-ntfs-with-addresses" , params , logger )
1215
+ with collect_metric (binary , job , dataset_uid ):
1216
+ res = launch_exec (binary , params , logger )
1177
1217
if res != 0 :
1178
- raise ValueError ("enrich-ntfs-with-addresses failed" )
1218
+ raise ValueError ("{} failed" . format ( binary ) )
1179
1219
except :
1180
1220
logger .exception ("" )
1181
1221
job .state = "failed"
0 commit comments