-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain_api.py
More file actions
143 lines (116 loc) · 5.57 KB
/
main_api.py
File metadata and controls
143 lines (116 loc) · 5.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import re
from typing import Optional, List, Dict
import os
import asyncio
import aiohttp
import asyncpg
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('API_KEY')
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_name = os.getenv('DB_NAME')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
def check_okved(okveds_for_check: List[str]) -> None:
"""Проверяет список ОКВЭД на валидность"""
if not okveds_for_check:
raise ValueError("Необходимо ввести ОКВЭД")
pattern = re.compile(r'^\d{2}(\.\d{2}(\.\d{1,2})?)?$')
for okved_for_check in okveds_for_check:
if not pattern.match(okved_for_check):
raise ValueError(f"Необходимо правильно ввести список кодов ОКВЭД. Неверный ОКВЭД: {okved_for_check}")
def check_region(region_for_check: int) -> None:
"""Проверяет код региона на валидность"""
if not region_for_check:
raise ValueError("Необходимо ввести код региона")
if not isinstance(region_for_check, int):
raise ValueError("Необходимо правильно ввести код региона.")
async def get_egrul_data(okved_list: List[str], region: int) -> Optional[dict]:
"""
Получает данные их API https://api.ofdata.ru по заданным параментрам
"""
check_okved(okved_list)
check_region(region)
page = 1
limit = 100
results = {}
try:
async with aiohttp.ClientSession() as session:
for okved in okved_list:
results[okved] = []
while True:
url = f'https://api.ofdata.ru/v2/search?key={api_key}&by=okved&obj=org&query={okved}®ion={region}' \
f'&limit={limit}&page={page}'
async with session.get(url) as response:
if response.status == 200:
page_data = await response.json()
result_data = page_data.get('data', [])
if result_data:
results[okved].extend(result_data['Записи'])
page += 1
total_pages = result_data.get('СтрВсего')
if page > total_pages:
break
else:
print("В ответе не найдены необходимые данные")
else:
print(f"API вернул код: {response.status_code}")
return results
except aiohttp.ClientError as ex:
raise ValueError(f'Ошибка соединения: {ex}')
except Exception as ex:
raise ValueError(f'Ошибка подлключения к API: {ex}')
async def insert_data_to_database(companies_data: Dict) -> None:
"""
Создает таблицу в БД, если она не существует и вносит в нее полученные из API данные.
"""
try:
con = await asyncpg.connect(
database=db_name,
user=db_user,
password=db_password,
host=db_host,
port=db_port
)
async with con.transaction():
await con.execute('''CREATE TABLE IF NOT EXISTS companies(
id SERIAL PRIMARY KEY,
company_name VARCHAR(155),
okved VARCHAR(155),
inn VARCHAR(10),
kpp VARCHAR(10),
legal_address VARCHAR(155)
);''')
print('Таблица создана успешно.')
for okved in companies_data:
if companies_data.get(okved):
for company in companies_data.get(okved):
company_name = company.get('НаимПолн')
okved_comp = okved
inn = company.get('ИНН')
kpp = company.get('КПП')
legal_address = company.get('ЮрАдрес')
await con.execute(
"INSERT INTO companies (company_name, okved, inn, kpp, legal_address) "
"VALUES ($1, $2, $3, $4, $5)",
company_name, okved_comp, inn, kpp, legal_address)
print("Записи внесены в БД успешно.")
except asyncpg.PostgresError as ex:
print(f"DataBase Error: {ex}")
finally:
await con.close()
async def main():
"""
Приложение обращается к API сервиса 'ofdata.ru' для получения сведений по компаниям в соотвествии с заданным
списком ОКВЭД и регионом.
После успешнрого получения данных, эти сведения (название компании, код ОКВЭД, ИНН, КПП и место регистрации ЮЛ)
вносятся в базу данных.
"""
okved = ['62', '62.01', '62.02', '62.02.1', '62.02.2', '62.02.3', '62.02.4', '62.02.9', '62.03', '62.03.11',
'62.03.12', '62.03.13', '62.03.19', '62.09']
region = 27
data = await get_egrul_data(okved_list=okved, region=region)
await insert_data_to_database(data)
if __name__ == '__main__':
asyncio.run(main())