Skip to content

Commit a266e5e

Browse files
authored
Merge pull request #4212 from hove-io/process_poi_and_push_to_asgard_s3
[Tyr Worker] Process poi and push to asgard s3
2 parents 7bec236 + d804aa5 commit a266e5e

File tree

5 files changed

+117
-2
lines changed

5 files changed

+117
-2
lines changed

docker/debian8/Dockerfile-master

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ RUN apt-get remove --yes cmake \
2727

2828
# Python 'requests' package handle its own CA certificate list
2929
# Let's force it to use the OS's list
30-
ENV REQUESTS_CA_BUNDLE /etc/ssl/certs
30+
ENV REQUESTS_CA_BUNDLE /etc/ssl/certs/ca-certificates.crt
3131

3232
# install rustup
3333
ENV RUSTUP_HOME=/usr/local/rustup \

source/tyr/tyr/binarisation.py

+43
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import datetime
3939
import shutil
4040
from functools import wraps
41+
import subprocess
4142

4243
from flask import current_app
4344
from shapely.geometry import MultiPolygon
@@ -58,6 +59,8 @@
5859

5960
from tyr.minio import MinioWrapper
6061

62+
from tyr.poi_to_excluded_zones import poi_to_excluded_zones
63+
6164

6265
def unzip_if_needed(filename):
6366
if not os.path.isdir(filename):
@@ -1233,6 +1236,46 @@ def gtfs2s3(self, instance_config, filename, job_id, dataset_uid):
12331236
_inner_2s3(self, "gtfs", instance_config, filename, job_id, dataset_uid)
12341237

12351238

1239+
@celery.task(bind=True)
1240+
def poi2asgard(self, instance_config, filename, job_id, dataset_uid):
1241+
"""Extract excluded zones and synchronize with"""
1242+
job = models.Job.query.get(job_id)
1243+
dataset = _retrieve_dataset_and_set_state("poi", job.id)
1244+
instance = job.instance
1245+
logger = get_instance_logger(instance, task_id=job_id)
1246+
1247+
excluded_zone_dir = "excluded_zones"
1248+
if os.path.isdir(excluded_zone_dir):
1249+
shutil.rmtree(excluded_zone_dir)
1250+
1251+
os.mkdir(excluded_zone_dir)
1252+
poi_to_excluded_zones(filename, excluded_zone_dir, instance.name)
1253+
1254+
try:
1255+
with collect_metric("poi2Asgard", job, dataset_uid):
1256+
asgard_bucket = current_app.config.get('MINIO_ASGARD_BUCKET_NAME', None)
1257+
if not asgard_bucket:
1258+
raise Exception("Asgard Bucket is None")
1259+
1260+
bash_command = (
1261+
"env REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt "
1262+
"aws s3 sync ./{excluded_zone_dir} s3://{asgard_bucket}/excluded_zones".format(
1263+
excluded_zone_dir=excluded_zone_dir, asgard_bucket=asgard_bucket
1264+
)
1265+
)
1266+
process = subprocess.Popen(bash_command.split(), stdout=subprocess.PIPE)
1267+
output, error = process.communicate()
1268+
if error:
1269+
raise Exception("Error occurred when putting excluded zones to asgard: {}".format(error))
1270+
except:
1271+
logger.exception("")
1272+
job.state = "failed"
1273+
dataset.state = "failed"
1274+
raise
1275+
finally:
1276+
models.db.session.commit()
1277+
1278+
12361279
def _inner_2s3(self, dataset_type, instance_config, filename, job_id, dataset_uid):
12371280
job = models.Job.query.get(job_id)
12381281
dataset = _retrieve_dataset_and_set_state(dataset_type, job.id)

source/tyr/tyr/default_settings.py

+2
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@
156156

157157
MINIO_BUCKET_NAME = os.getenv('TYR_MINIO_BUCKET_NAME', None)
158158

159+
MINIO_ASGARD_BUCKET_NAME = os.getenv('TYR_MINIO_ASGARD_BUCKET_NAME', None)
160+
159161
MINIO_USE_IAM_PROVIDER = os.getenv('TYR_MINIO_USE_IAM_PROVIDER', 'true').lower() in ['1', 'true', 'yes']
160162

161163
MINIO_ACCESS_KEY = os.getenv('TYR_MINIO_ACCESS_KEY', None)
+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import argparse
2+
import csv
3+
import json
4+
import logging
5+
6+
7+
def poi_to_excluded_zones(poi_file, output_dir, instance_name):
8+
tmp_path = "tmp/poi_{}".format(instance_name)
9+
import zipfile
10+
11+
with zipfile.ZipFile(poi_file, 'r') as zip_ref:
12+
zip_ref.extractall(tmp_path)
13+
14+
excluded_zones = {}
15+
excluded_geometries_ids = {}
16+
17+
# get excluded zones
18+
with open(tmp_path + "/poi_properties.txt") as csvfile:
19+
reader = csv.reader(csvfile, delimiter=';', quotechar='"')
20+
for row in reader:
21+
if row[1].lower() != "excluded_zones":
22+
continue
23+
excluded_zones[row[0]] = json.loads(row[2])
24+
25+
# find geometry id
26+
with open(tmp_path + "/poi.txt") as csvfile:
27+
reader = csv.reader(csvfile, delimiter=';', quotechar='"')
28+
for row in reader:
29+
if row[0] not in excluded_zones:
30+
continue
31+
excluded_geometries_ids[row[0]] = row[7]
32+
33+
if excluded_geometries_ids.keys() != excluded_zones.keys():
34+
logger.error("not all excluded zone's pois are found in poi.txt")
35+
logger.error("excluded_geometries_ids: {}".format(excluded_geometries_ids.keys()))
36+
logger.error("excluded_zones: {}".format(excluded_zones.keys()))
37+
38+
# read geometries
39+
geometries_shapes = {}
40+
with open(tmp_path + "/geometries.txt") as csvfile:
41+
reader = csv.reader(csvfile, delimiter=';', quotechar='"')
42+
for row in reader:
43+
geometries_shapes[row[0]] = row[1]
44+
45+
for poi_id, zones in excluded_zones.items():
46+
geometry_id = excluded_geometries_ids.get(poi_id)
47+
if not geometry_id:
48+
logger.error("{} could not be found in poi.txt".format(row[0]))
49+
shape = geometries_shapes.get(geometry_id)
50+
if not shape:
51+
logger.error("{} could not be found in geometries.txt".format(geometry_id))
52+
53+
for i, zone in enumerate(zones):
54+
output_id = "{}_{}_{}".format(poi_id, i, instance_name)
55+
output = {'id': output_id}
56+
output.update(zone)
57+
output["shape"] = shape
58+
with open(output_dir + "/{}.json".format(output_id), "w") as output_file:
59+
json.dump(output, output_file)
60+
61+
62+
if __name__ == '__main__':
63+
parser = argparse.ArgumentParser()
64+
parser.add_argument('--poi', help='poi zip')
65+
args = parser.parse_args()
66+
logger = logging.getLogger(__name__)
67+
68+
poi_to_excluded_zones(args.poi, "excluded_zones", "dummy_instance")

source/tyr/tyr/tasks.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
fusio2s3,
6161
gtfs2s3,
6262
zip_if_needed,
63+
poi2asgard,
6364
)
6465
from tyr.binarisation import reload_data, move_to_backupdirectory
6566
from tyr import celery
@@ -207,7 +208,8 @@ def process_ed2nav():
207208
loki_data_source, instance.name
208209
)
209210
)
210-
211+
if dataset.type == "poi":
212+
actions.append(poi2asgard.si(instance_config, filename, dataset_uid=dataset.uid))
211213
actions.append(task[dataset.type].si(instance_config, filename, dataset_uid=dataset.uid))
212214
else:
213215
# unknown type, we skip it

0 commit comments

Comments
 (0)