|
| 1 | +__all__ = ('OracledbOperator',) |
| 2 | + |
| 3 | +import oracledb |
| 4 | +from sqlalchemy import create_engine |
| 5 | +from sqlalchemy.sql import text |
| 6 | + |
| 7 | +from .base import PasswordVault |
| 8 | +from ..logger import LOG |
| 9 | + |
| 10 | +LOG.debug("oracledb version = %s" % oracledb.__version__) |
| 11 | + |
| 12 | + |
| 13 | +class OracledbOperator: |
| 14 | + def __init__(self, db_config, **kwargs): |
| 15 | + """example of db_config: |
| 16 | + { |
| 17 | + "host": "192.168.1.100", |
| 18 | + "port": 1521, |
| 19 | + "user": "PT_INDEX", |
| 20 | + "password": "vault_key_or_plain", |
| 21 | + "service_name": "orcl", # 推荐使用 service_name |
| 22 | + "sid": "orcl", # 或使用 sid |
| 23 | + "vault_type": "...", |
| 24 | + "vault_config": {...}, |
| 25 | + "lib_dir": "/opt/oracle/instantclient" # optional, use THICK mode if defined. |
| 26 | + } |
| 27 | + """ |
| 28 | + |
| 29 | + password_vault = PasswordVault.get_vault(db_config.get('vault_type'), db_config.get('vault_config')) |
| 30 | + self._config = { |
| 31 | + 'host': db_config['host'], |
| 32 | + 'port': db_config['port'], |
| 33 | + 'user': db_config['user'], |
| 34 | + 'password': password_vault.get_password(db_config.get('password')), |
| 35 | + } |
| 36 | + |
| 37 | + if 'lib_dir' in db_config: # use Thick mode |
| 38 | + try: |
| 39 | + oracledb.init_oracle_client(lib_dir=db_config['lib_dir']) |
| 40 | + LOG.info("Oracle client initialized in THICK mode from: %s" % db_config["lib_dir"]) |
| 41 | + except Exception as e: |
| 42 | + LOG.warning(f"Warning: {e}") |
| 43 | + raise RuntimeError(f"Failed to initialize Oracle client: {e}") |
| 44 | + |
| 45 | + service_name = db_config.get("service_name") |
| 46 | + sid = db_config.get("sid") |
| 47 | + |
| 48 | + if service_name: # using service_name (recommended) |
| 49 | + dsn = oracledb.makedsn(db_config["host"], db_config["port"], service_name=service_name) |
| 50 | + elif sid: # using SID |
| 51 | + dsn = oracledb.makedsn(db_config["host"], db_config["port"], sid=sid) |
| 52 | + else: |
| 53 | + raise ValueError("Oracle config must specify service_name or sid") |
| 54 | + |
| 55 | + self._config["dsn"] = dsn |
| 56 | + try: |
| 57 | + self.engine = create_engine( |
| 58 | + "oracle+oracledb://{user}:{password}@".format(**self._config), |
| 59 | + connect_args={"dsn": dsn}, |
| 60 | + pool_size=20, max_overflow=10, pool_pre_ping=True, **kwargs |
| 61 | + ) |
| 62 | + msg = "OracleDB connected: {host}:{port}".format(**self._config) |
| 63 | + print(msg) |
| 64 | + except Exception as e: |
| 65 | + LOG.error(e) |
| 66 | + raise RuntimeError(f"Failed to connect to OracleDB") |
| 67 | + |
| 68 | + @property |
| 69 | + def connection(self): |
| 70 | + return self.engine |
| 71 | + |
| 72 | + def execute_query(self, sql, *args, **kwargs): |
| 73 | + with self.engine.connect() as conn: |
| 74 | + cur = conn.execute(text(sql), *args, **kwargs) |
| 75 | + return cur |
| 76 | + |
| 77 | + @property |
| 78 | + def connection_str(self) -> str: |
| 79 | + return "oracle://{user}@{host}:{port}".format(**self._config) |
0 commit comments