-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
63 lines (44 loc) · 1.74 KB
/
main.py
File metadata and controls
63 lines (44 loc) · 1.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env python
from pathlib import PurePath
from google.cloud import storage
from utils.beamutils import ProcessFile
from utils.logger import logging
logger = logging.getLogger(__name__)
def run(pipeline_args, known_args):
"""Invoked by the Beam runner"""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(
[
'--no_use_public_ips',
'--disk_size_gb=24',
'--number_of_worker_harness_threads=1',
*pipeline_args,
],
temp_location=f'gs://{known_args.dataflow_bucket}/TEMP/',
staging_location=f'gs://{known_args.dataflow_bucket}/STAGING/',
)
parse_pipeline_args = dict([z[2:].split('=') for z in pipeline_args if '=' in z])
bq_path = known_args.bq_path
beam_args = {
'bq_path': bq_path,
'job_name': parse_pipeline_args.get('job_name'),
}
_, bucket_name, *rest = PurePath(known_args.gcs_url).parts # url decomposing
blobs = storage.Client().list_blobs(bucket_name, prefix='/'.join(rest))
gcs_urls = [f'gs://{bucket_name}/{_.name}' for _ in blobs]
logging.info('Files to process: %s', len(gcs_urls))
with beam.Pipeline(options=pipeline_options) as p:
(
p
| 'Create Beam' >> beam.Create(gcs_urls)
| 'Process Raster' >> beam.ParDo(ProcessFile(beam_args))
)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--gcs_url', type=str)
parser.add_argument('--bq_path', type=str)
parser.add_argument('--dataflow_bucket', type=str)
known_args, pipeline_args = parser.parse_known_args()
run(pipeline_args, known_args)