|
| 1 | +#!/usr/bin/env python |
| 2 | +# Copyright 2020 Google LLC |
| 3 | +# |
| 4 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +# you may not use this file except in compliance with the License. |
| 6 | +# You may obtain a copy of the License at |
| 7 | +# |
| 8 | +# https://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +# |
| 10 | +# Unless required by applicable law or agreed to in writing, software |
| 11 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +# See the License for the specific language governing permissions and |
| 14 | +# limitations under the License. |
| 15 | +"""Shows how to download in parallel a set of reports from a list of accounts. |
| 16 | +
|
| 17 | +If you need to obtain a list of accounts, please see the |
| 18 | +account_management/get_account_hierarchy.py or |
| 19 | +account_management/list_accessible_customers.py examples. |
| 20 | +""" |
| 21 | + |
| 22 | +import argparse |
| 23 | +from itertools import product |
| 24 | +import multiprocessing |
| 25 | +import sys |
| 26 | +import time |
| 27 | + |
| 28 | +from google.ads.google_ads.client import GoogleAdsClient |
| 29 | +from google.ads.google_ads.errors import GoogleAdsException |
| 30 | + |
| 31 | +# Maximum number of processes to spawn. |
| 32 | +MAX_PROCESSES = multiprocessing.cpu_count() |
| 33 | +# Timeout between retries in seconds. |
| 34 | +BACKOFF_FACTOR = 5 |
| 35 | +# Maximum number of retries for errors. |
| 36 | +MAX_RETRIES = 5 |
| 37 | + |
| 38 | + |
| 39 | +def main(client, customer_ids): |
| 40 | + """The main method that creates all necessary entities for the example. |
| 41 | +
|
| 42 | + Args: |
| 43 | + client: an initialized GoogleAdsClient instance. |
| 44 | + customer_ids: an array of client customer IDs. |
| 45 | + """ |
| 46 | + |
| 47 | + # Define the GAQL query strings to run for each customer ID. |
| 48 | + campaign_query = """ |
| 49 | + SELECT campaign.id, metrics.impressions, metrics.clicks |
| 50 | + FROM campaign |
| 51 | + WHERE segments.date DURING LAST_30_DAYS""" |
| 52 | + ad_group_query = """ |
| 53 | + SELECT campaign.id, ad_group.id, metrics.impressions, metrics.clicks |
| 54 | + FROM ad_group |
| 55 | + WHERE segments.date DURING LAST_30_DAYS""" |
| 56 | + |
| 57 | + inputs = _generate_inputs( |
| 58 | + client, customer_ids, [campaign_query, ad_group_query] |
| 59 | + ) |
| 60 | + with multiprocessing.Pool(MAX_PROCESSES) as pool: |
| 61 | + # Call _issue_search_request on each input, parallelizing the work |
| 62 | + # across processes in the pool. |
| 63 | + results = pool.starmap(_issue_search_request, inputs) |
| 64 | + |
| 65 | + # Partition our results into successful and failed results. |
| 66 | + successes = [] |
| 67 | + failures = [] |
| 68 | + for res in results: |
| 69 | + if res[0]: |
| 70 | + successes.append(res[1]) |
| 71 | + else: |
| 72 | + failures.append(res[1]) |
| 73 | + |
| 74 | + # Output results. |
| 75 | + print( |
| 76 | + f'Total successful results: {len(successes)}\n' |
| 77 | + f'Total failed results: {len(failures)}\n' |
| 78 | + ) |
| 79 | + |
| 80 | + print("Successes:") if len(successes) else None |
| 81 | + for success in successes: |
| 82 | + # success["results"] represents an array of result strings for one |
| 83 | + # customer ID / query combination. |
| 84 | + result_str = "\n".join(success["results"]) |
| 85 | + print(result_str) |
| 86 | + |
| 87 | + print("Failures:") if len(failures) else None |
| 88 | + for failure in failures: |
| 89 | + ex = failure["exception"] |
| 90 | + print( |
| 91 | + f'Request with ID "{ex.request_id}" failed with status ' |
| 92 | + f'"{ex.error.code().name}" for customer_id ' |
| 93 | + f'{failure["customer_id"]} and query "{failure["query"]}" and ' |
| 94 | + 'includes the following errors:' |
| 95 | + ) |
| 96 | + for error in ex.failure.errors: |
| 97 | + print(f'\tError with message "{error.message}".') |
| 98 | + if error.location: |
| 99 | + for ( |
| 100 | + field_path_element |
| 101 | + ) in error.location.field_path_elements: |
| 102 | + print(f'\t\tOn field: {field_path_element.field_name}') |
| 103 | + |
| 104 | + |
| 105 | +def _issue_search_request(client, customer_id, query): |
| 106 | + """Issues a search request using streaming. |
| 107 | +
|
| 108 | + Retries if a GoogleAdsException is caught, until MAX_RETRIES is reached. |
| 109 | +
|
| 110 | + Args: |
| 111 | + client: an initialized GoogleAdsClient instance. |
| 112 | + customer_id: a client customer ID str. |
| 113 | + query: a GAQL query str. |
| 114 | + """ |
| 115 | + ga_service = client.get_service("GoogleAdsService", version="v6") |
| 116 | + retry_count = 0 |
| 117 | + # Retry until we've reached MAX_RETRIES or have successfully received a |
| 118 | + # response. |
| 119 | + while True: |
| 120 | + try: |
| 121 | + response = ga_service.search_stream(customer_id, query) |
| 122 | + # Returning a list of GoogleAdsRows will result in a |
| 123 | + # PicklingError, so instead we put the GoogleAdsRow data |
| 124 | + # into a list of str results and return that. |
| 125 | + result_strings = [] |
| 126 | + for batch in response: |
| 127 | + for row in batch.results: |
| 128 | + ad_group_id = ( |
| 129 | + f'Ad Group ID {row.ad_group.id} in ' |
| 130 | + if 'ad_group.id' in query |
| 131 | + else '' |
| 132 | + ) |
| 133 | + result_string = ( |
| 134 | + f'{ad_group_id}' |
| 135 | + f'Campaign ID {row.campaign.id} ' |
| 136 | + f'had {row.metrics.impressions} impressions ' |
| 137 | + f'and {row.metrics.clicks} clicks.' |
| 138 | + ) |
| 139 | + result_strings.append(result_string) |
| 140 | + return (True, {"results": result_strings}) |
| 141 | + except GoogleAdsException as ex: |
| 142 | + # This example retries on all GoogleAdsExceptions. In practice, |
| 143 | + # developers might want to limit retries to only those error codes |
| 144 | + # they deem retriable. |
| 145 | + if retry_count < MAX_RETRIES: |
| 146 | + retry_count += 1 |
| 147 | + time.sleep(retry_count * BACKOFF_FACTOR) |
| 148 | + else: |
| 149 | + return ( |
| 150 | + False, |
| 151 | + { |
| 152 | + "exception": ex, |
| 153 | + "customer_id": customer_id, |
| 154 | + "query": query, |
| 155 | + }, |
| 156 | + ) |
| 157 | + |
| 158 | + |
| 159 | +def _generate_inputs(client, customer_ids, queries): |
| 160 | + """Generates all inputs to feed into search requests. |
| 161 | +
|
| 162 | + A GoogleAdsService instance cannot be serialized with pickle for parallel |
| 163 | + processing, but a GoogleAdsClient can be, so we pass the client to the |
| 164 | + pool task which will then get the GoogleAdsService instance. |
| 165 | +
|
| 166 | + Args: |
| 167 | + client: An initialized GoogleAdsClient instance. |
| 168 | + customer_ids: A list of str client customer IDs. |
| 169 | + queries: A list of str GAQL queries. |
| 170 | + """ |
| 171 | + return product([client], customer_ids, queries) |
| 172 | + |
| 173 | + |
| 174 | +if __name__ == "__main__": |
| 175 | + # GoogleAdsClient will read the google-ads.yaml configuration file in the |
| 176 | + # home directory if none is specified. |
| 177 | + google_ads_client = GoogleAdsClient.load_from_storage() |
| 178 | + |
| 179 | + parser = argparse.ArgumentParser( |
| 180 | + description="Download a set of reports in parallel from a list of " |
| 181 | + "accounts." |
| 182 | + ) |
| 183 | + # The following argument(s) should be provided to run the example. |
| 184 | + parser.add_argument( |
| 185 | + "-c", |
| 186 | + "--customer_ids", |
| 187 | + nargs="+", |
| 188 | + type=str, |
| 189 | + required=True, |
| 190 | + help="The Google Ads customer IDs.", |
| 191 | + ) |
| 192 | + args = parser.parse_args() |
| 193 | + |
| 194 | + main(google_ads_client, args.customer_ids) |
0 commit comments