Skip to content
Home » Browser API

Browser API

1、Interface authentication and interface form

1.1 signature

Request the required pre-information

  • X-API-KEY: Assigned by my side, it is a unique uuid, and this ID will be scoped to this ID.

Example:

curl --location --request PATCH 'http://domain:port/v2/env/open_env' \
--header 'X-API-KEY: API-KEY' \
--header 'User-Agent: Apifox/1.0.0 (https://apifox.com)' \
--header 'Accept: */*' \
--header 'Host: domain:port' \
--header 'Connection: keep-alive'

1.2 Latest domain name

http://domain:port
  • The test environment is available with a key
API-KEY

1.3 interface form

  • Response Basic Structure:
Fieldtypedescription
codeIntegerThe return code is enumerated as follows: 0: SuccessOther: Error code
msgStringReturns an error message when an exception occurs
dataData objects/data lists, defined according to different interfaces
nextStringMay exist if and only if the interface is a list-based interface, indicating the starting value of the next page (not included), and if null, it means that there is no next page (the last page of the page or no data)
// Data object example
{
    "code": 0,
    "msg": "成功",
    "data": {}
}
// Data array example
{
    "code": 0,
    "msg": "成功",
    "data": []
}
// Exception example
{
    "code": 400009,
    "msg": "参数异常",
    "data": null
}

2、interface

2.1 Save the environment configuration

URL: /v2/env

Request method: POST

Parameter:

The nametypemustbe explained
random_uaboolbeUser-Agent performs random generation, true: regenerate every time it is turned on false: use the first generated UA
random_fingerprintboolbeRandom fingerprint: true: Regenerate fingerprint every time you open it, false: Use the fingerprint generated for the first time
proxy_update_typestringbeAgent account data update mode: COVER: APPEND: Append
proxy_waystringbeAgency method: NON_USE : No proxy (default Singapore) RANDOM: Randomly select the proxy account USE_ONE: Use the proxy account only once
proxys[proxy]beAgent account information
  • proxy
The nametypemustbe explained
typestringbeProxy type (NON_USE: does not use HTTP HTTPS SSH SOCKS5)
hoststringbeProxy hosting
portstringbeProxy port
user_namestringbeAgent account
passwdstringbeProxy password
  • Request example
  • All data modifications
{
    "proxy_update_type" : "COVER",
    "proxy_way": "RANDOM",
    "proxys": [
        {"type": "SOCKS5","host":"ep.test.com","port":"6616","user_name":"test","passwd":"test"},
        {"type": "SOCKS5","host":"ep.test.com","port":"6616","user_name":"test","passwd":"test"},
     ],
    "random_ua": true,
    "random_fingerprint": true
}
  • Modify agent data
{
    "proxy_update_type" : "COVER",
    "proxys": [
        {"type": "SOCKS5","host":"ep.test.com","port":"6616","user_name":"test","passwd":"test"},
        {"type": "SOCKS5","host":"ep.test.com","port":"6616","user_name":"test","passwd":"test"},
     ]
}
  • Modify the proxy method
{
    "proxy_way": "USE_ONE"
}
  • Modify UA data
{
  "random_ua": true
   "ua": ""
}
  • Modify random fingerprints
{
  "random_fingerprint": true
}
  • Example response
{
    "code": 0,
    "msg": "成功",
    "data": null
}

2.2 Open the environment

URL: /v2/env/open_env

Request method: PATCH

Response parameters:

The nametypemustbe explained
urlstringbeCDP controls connections
session_idstringbeCurrent CDP session ID

Exception Dictionary:

Error codeinformationexplained
300104The agent configuration is exhausted, update the agent configurationproxy_way USE_ONE, the proxy information configured by the account has been used up
300105If the number of browser instances is too high, please close some of them and try againIf there are too many instances that are running and you need to close the previously unused instances
300106The cloud browser is abnormalIf the cloud browser starts abnormally, check the information judgment (it is common for proxy information to be abnormal and cannot be started).
300000Business anomaliesIf the system is abnormal, please check the information to judge
  • Example response
{
    "code": 0,
    "msg": "成功",
    "data": {
        "url": "ws://8.222.226.165:8081/cdp/c0d7fb01933d472687c04bdb47337024",
        "session_id": "c0d7fb01933d472687c04bdb47337024"
    }
}

2.3 Close the environment

URL: /v2/env/close_env

Request method: PATCH

  • Example response
{
    "code": 0,
    "msg": "成功",
    "data": null
}

3、Test code

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('browser_control.log', encoding='utf-8'),  # Write to file
        logging.StreamHandler()  # Output to console
    ]
)
logger = logging.getLogger(__name__)

# API endpoints and keys
OPEN_ENV_URL = "http://domain:port/v2/env/open_env"
CLOSE_ENV_URL = "http://domain:port/v2/env/close_env"
X_API_KEY = "API-KEY"  # Please replace with your actual key

# Create screenshot directory (if it doesn't exist)
SCREENSHOT_DIR = "screenshots"
os.makedirs(SCREENSHOT_DIR, exist_ok=True)

# Define custom exception class to distinguish different types of errors
class BrowserAPIError(Exception):
    """Custom exception class to represent errors when interacting with the Browser API"""
    def __init__(self, code: int, message: str):
        self.code = code
        self.message = message
        super().__init__(f"API Error [{code}]: {message}")

async def open_browser_session() -> Tuple[Optional[str], Optional[str]]:
    """
    Call the /open_env interface to obtain a new browser session ID and CDP URL.
    Returns (session_id, cdp_url) on success.
    Logs error and returns (None, None) on failure, or raises a custom exception.
    """
    headers = {
        "X-API-KEY": X_API_KEY,
        "Content-Type": "application/json"
    }
    try:
        # Send PATCH request
        response = requests.patch(OPEN_ENV_URL, headers=headers, timeout=30) # Add timeout
        response.raise_for_status()  # Check HTTP status code

        # Parse JSON response
        try:
            data = response.json()
        except requests.exceptions.JSONDecodeError:
            logger.error(f"API 响应非JSON格式: {response.text}")
            return None, None

        # Get business status code and message
        code = data.get('code')
        message = data.get('msg', '未知错误')

        # Check if business logic was successful
        if code == 0:
            session_id = data['data']['session_id']
            cdp_url = data['data']['url']
            logger.info(f"成功获取新会话: session_id={session_id}, CDP URL={cdp_url}")
            return session_id, cdp_url
        else:
            # Handle known error codes
            error_messages = {
                300104: "代理配置已耗尽,请更新代理配置。proxy_way为USE_ONE时,账号所配置的代理信息已使用完",
                300105: "浏览器实例数量启动过多,请关闭部分实例后重试。开启运行的实例过多,需要关闭之前不使用的实例",
                300106: "云浏览器异常,云浏览器启动异常,请查看信息判断(常见于代理信息异常,无法启动)",
                300000: "业务异常,系统异常,请查看信息判断"
            }
            # Get detailed error info or use default
            logger.error(f"打开接口响应: {message}")
            detailed_message = error_messages.get(code, f"未知错误: {message}")
            logger.error(f"获取会话失败: [{code}] {detailed_message}")

            # Option to raise exception for caller to handle specific errors, or simply return None
            # Here we log and return None
            return None, None

    except requests.exceptions.Timeout:
        logger.error("请求超时,请检查网络连接或API服务状态")
        return None, None
    except requests.exceptions.ConnectionError:
        logger.error("网络连接错误,请检查网络或API地址")
        return None, None
    except requests.exceptions.RequestException as e:
        logger.error(f"HTTP请求异常: {e}")
        return None, None
    except KeyError as e:
        logger.error(f"API响应数据结构异常,缺少必要字段: {e}")
        return None, None
    except Exception as e:
        logger.error(f"获取会话时发生未预期的错误: {e}")
        return None, None

async def close_browser_session(session_id: str) -> bool:
    """
    Call the /close_env interface to close a specified browser session
    :param session_id: Session ID to close
    :return: True on success, False on failure
    """
    if not session_id:
        logger.warning("尝试关闭空的会话ID")
        return False

    headers = {
        "X-API-KEY": X_API_KEY,
        "Content-Type": "application/json"
    }
    try:
        # Note: Usually closing a session requires passing session_id in the request body or parameters
        # Assuming session_id is passed via request body; adjust based on actual API documentation
        # Example: passing via request body
        payload = {"session_id": session_id}
        response = requests.patch(CLOSE_ENV_URL, headers=headers, json=payload, timeout=30)
        response.raise_for_status()

        data = response.json()
        if data.get('code') == 0:
            logger.info(f"成功关闭会话: session_id={session_id}")
            return True
        else:
            logger.error(f"关闭会话失败: [{data.get('code')}] {data.get('msg')}")
            return False

    except Exception as e:
        logger.error(f"关闭会话失败: {e}")
        return False

async def take_screenshot_with_playwright(cdp_url: str, screenshot_path: str):
    """
    Use Playwright to connect to CDP URL, open page, and take screenshot
    :param cdp_url: CDP WebSocket URL
    :param screenshot_path: Full path and filename where screenshot will be saved
    """
    async with async_playwright() as p:
        browser = None
        page = None
        try:
            # Create browser instance using CDP connection
            browser = await p.chromium.connect_over_cdp(cdp_url)
            page = await browser.new_page()

            # Open target website (Note: space at the end of URL has been corrected)
            await page.goto("https://ip111.cn/")
            logger.info("已导航至 https://ip111.cn/")

            # Wait for page to load (Optional: increase wait time to ensure content rendering)
            await page.wait_for_timeout(2000) # Or use await page.wait_for_load_state("networkidle")

            # Screenshot
            await page.screenshot(path=screenshot_path, full_page=True) # Suggest full page screenshot
            logger.info(f"截图已保存至: {screenshot_path}")

        except Exception as e:
            logger.error(f"Playwright 操作失败: {e}")
            raise
        finally:
            # Ensure resources are properly released
            if page:
                await page.close()
            if browser:
                await browser.close()

async def run_cycle(cycle_number: int) -> bool:
    """
    Execute one complete cycle: Open session -> Screenshot -> Close session
    :param cycle_number: Index of current cycle (1-6)
    :return: True on success, False on failure
    """
    logger.info(f"开始第 {cycle_number} 轮操作...")
    session_id = None
    cdp_url = None

    try:
        # 1. Obtain a new browser session
        session_id, cdp_url = await open_browser_session()
        if not session_id or not cdp_url:
            logger.error("无法获取浏览器会话,跳过本次循环")
            return False # Can decide to return False or raise exception as needed

        # Generate unique screenshot filename
        screenshot_filename = f"screenshot_{cycle_number:02d}.png" # e.g., screenshot_01.png
        screenshot_path = os.path.join(SCREENSHOT_DIR, screenshot_filename)

        # 2. Control browser with Playwright and take screenshot, passing specific filename
        await take_screenshot_with_playwright(cdp_url, screenshot_path)

        logger.info(f"第 {cycle_number} 轮操作成功完成")
        return True

    except Exception as e:
        logger.error(f"第 {cycle_number} 轮操作发生异常: {e}")
        return False
    finally:
        # 3. Ensure browser session closure attempt to avoid resource leaks regardless of success
        # Note: Only attempt closure if session_id was successfully obtained
        if session_id:
            try:
                await close_browser_session(session_id)
            except Exception as close_error:
                logger.error(f"尝试关闭会话 {session_id} 时发生错误: {close_error}")

async def main():
    """
    Main function: Execute loop for a specified number of times
    """
    total_cycles = 1 # You can adjust the cycle count as needed
    successful_cycles = 0

    for i in range(1, total_cycles + 1):
        # Execute one round of operation
        success = await run_cycle(i)

        if success:
            successful_cycles += 1
            logger.info(f"第 {i} 次循环成功")
        else:
            logger.error(f"第 {i} 次循环失败")

        # Optional: Add delay between cycles
        if i < total_cycles:
            delay_seconds = 2 # Increase delay time
            logger.info(f"等待 {delay_seconds} 秒后进行下一次循环...")
            await asyncio.sleep(delay_seconds)

    logger.info(f"所有循环完成。成功: {successful_cycles}/{total_cycles}")

if __name__ == "__main__":
    asyncio.run(main())