Tencent Cloud Code AnalysisTencent Cloud Code Analysis
Guide
API
  • IDE

    • Visual Studio Code TCA Plugin
    • JetBrains IDEs TCA Plugin
  • CNB

    • CNB TCA Plugin
    • CNB TCA Badge
  • Jenkins

    • Jenkins TCA Plugin
  • MCP

    • TCA MCP Server
  • Advanced

    • Intranet Repository Analysis
    • Trigger Analysis via API
    • Optimizing Analysis Speed
    • Improving the Issue Detection Rate
  • Other

    • Issue Ignoring Methods
Try Now
  • Service Agreement
  • Privacy Agreement
  • 简体中文
  • English
Guide
API
  • IDE

    • Visual Studio Code TCA Plugin
    • JetBrains IDEs TCA Plugin
  • CNB

    • CNB TCA Plugin
    • CNB TCA Badge
  • Jenkins

    • Jenkins TCA Plugin
  • MCP

    • TCA MCP Server
  • Advanced

    • Intranet Repository Analysis
    • Trigger Analysis via API
    • Optimizing Analysis Speed
    • Improving the Issue Detection Rate
  • Other

    • Issue Ignoring Methods
Try Now
  • Service Agreement
  • Privacy Agreement
  • 简体中文
  • English
  • 深入

    • Intranet Repository Analysis
    • Trigger Analysis via API
    • Optimizing Analysis Speed
    • Improving the Issue Detection Rate

Trigger Analysis via API

This document aims to guide how to trigger analysis execution through TCA APIs.

Application Scenarios

  • Can be encapsulated into an API script to trigger analysis by calling the API as needed.

  • Can be encapsulated into a CI plugin and integrated into the CI pipeline to trigger analysis.

Guide to Triggering Code Analysis

Overall Process

By calling the API, register the code repository into the TCA platform, create an analysis project based on configurations such as the analysis scheme, and then trigger code analysis. TCA schedules the task to run code analysis on node machines and retrieves the analysis task results through polling. Finally, determine whether the quality gate passes based on the quality gate information in the results, and obtain detailed task execution information and issue lists.

Hierarchy Description

Team -> Project -> Code Repository -> Analysis Project (composed of branch + analysis path + analysis scheme to form a unique analysis project).

  1. Create a project group. If it already exists, use it directly without creation.

  2. Register the code repository. If it already exists, use it directly without registration.

  3. Create an analysis project. If it already exists, use it directly without creation.

  4. Start the analysis task.

  5. Poll the task status to determine if the task execution succeeded/failed.

  6. Obtain analysis results, quality gate information, etc.

  7. Obtain code inspection issue data.

Prerequisite Steps

Overview of Prerequisite Steps

  • Get API Access Token

  • Get Code Repository Credential ID

  • Get Analysis Scheme ID

  • Integrate Online Node

  • Get API Access Token: Required for API interface authentication, including user_id and token.

    API Access Token

  • Get Code Repository Credential ID: Required for registering the code repository, used to pull code when executing analysis, i.e., scm_account_id. If it does not exist, create a credential.

    Get Credential

  • Get Analysis Scheme ID: Required for creating an analysis project. You can obtain scheme_id from the Analysis Scheme - Basic Configuration page or the URL link. If it does not exist, create a team/project analysis scheme as needed.

  • Integrate Online Node: Required for executing code analysis. Essentially, tasks triggered via API are executed on TCA online nodes. Ensure that TCA has available online nodes; otherwise, analysis will fail due to no available nodes.

API Invocation Process

For API interface authentication, refer to the document: API Interface Authentication

1. Create Project Group

It is recommended to first check if the project group exists. If not, create it; if it exists, use it directly.

GET|POST /server/main/api/orgs/{org_sid}/teams/
  • Get Project Group List API

  • Create Project Group API:

    import requests
    
    url = f"{base_url}/server/main/api/orgs/{org_sid}/teams/"
    post_data = {
        "name": team_name,
        "display_name": team_name
    }
    headers = get_headers(user_id, token)
    
    resp = requests.post(url, json=post_data, headers=headers)
    result = resp.json()
    print(result)
    

2. Register Code Repository

It is recommended to first check if the corresponding code repository exists. If not, register it; if it exists, use it directly.

GET|POST /server/main/api/orgs/{org_sid}/teams/{team_name}/repos/
  • Get Code Repository List API

  • Create Code Repository API:

    import requests
    
    url = f"{base_url}/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/"
    post_data = {
        "scm_url": repo_url,
        "scm_type": "git",
        "scm_auth": {
            "auth_type": "password",
            "scm_account": scm_account_id
        },
        "created_from": "api"
    }
    headers = get_headers(user_id, token)
    
    resp = requests.post(url, json=post_data, headers=headers)
    result = resp.json()
    print(result)
    

3. Create Analysis Project

It is recommended to first check if the corresponding analysis project exists. If not, create it; if it exists, use it directly.

GET|POST /server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/
  • Get Analysis Project List API

  • Create Analysis Project API:

    import requests
    
    url = f"{base_url}/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/"
    post_data = {
        "branch": repo_branch,
        "global_scheme_id": scheme_id,
        "use_scheme_template": True,
        "created_from": "api"
    }
    headers = get_headers(user_id, token)
    
    resp = requests.post(url, json=post_data, headers=headers)
    result = resp.json()
    print(result)
    

4. Start Analysis Task

POST /server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scans/create/
  • Normal Trigger

    post_data = {
        "incr_scan": True, # Whether to perform incremental scan (False for full scan, True for incremental)
    }
    
  • Merge Request Trigger

    post_data = {
        "incr_scan": True, # Whether to perform incremental scan (False for full scan, True for incremental)
        "ignore_branch_issue": "target_branch", # Target branch for comparison
        "ignore_merged_issue": True
    }
    

Start Analysis API:

import requests

url = f"{base_url}/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scans/create/"
headers = get_headers(user_id, token)

resp = requests.post(url, json=post_data, headers=headers)
result = resp.json()
print(result)

5. Poll Task Status

GET /server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/jobs/{job_id}/detail/

Get Task Details API: Poll the scan status until state == 2:

6. Get Analysis Results

After scanning completes, the interface returns a complete scan results object results[0] containing multi-dimensional scan data.

GET /server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scaninfos/

Get Analysis Overview List API:

import requests

url = f"{base_url}/server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scaninfos/"
headers = get_headers(user_id, token)

resp = requests.get(url, headers=headers)
result = resp.json()
results = result.get("data", {}).get("results", [])
print(result[0])
6.1. Code Inspection Results (lintscan)
lint = result[0].get("lintscan", {})
print({
    "issue_open_num": lint.get("issue_open_num"),
    "issue_fix_num": lint.get("issue_fix_num"),
    "issue_detail_num": lint.get("issue_detail_num"),
})
6.2. Cyclomatic Complexity Results (cyclomaticcomplexityscan)
cc = result[0].get("cyclomaticcomplexityscan", {})
print({
    "diff_cc_num": cc.get("diff_cc_num"),
    "cc_open_num": cc.get("cc_open_num"),
})
6.3. Duplicate Code Results (duplicatescan)
dup = result[0].get("duplicatescan", {})
print({
    "duplicate_block_count": dup.get("duplicate_block_count"),
    "total_duplicate_line_count": dup.get("total_duplicate_line_count"),
})
6.4. Code Statistics Results (clocscan)
cloc = result.get("clocscan", {})
print({
    "code_line_num": cloc.get("code_line_num"),
    "comment_line_num": cloc.get("comment_line_num"),
    "blank_line_num": cloc.get("blank_line_num"),
    "total_line_num": cloc.get("total_line_num"),
})
6.5. Get Quality Gate Status (Quality Gate)
quality = result[0].get("qualityscan", {})
status = quality.get("status")  # "success": Gate passed, "failure": Gate failed, "close": Gate not enabled
description = quality.get("description")

if status == "success":
    print("Quality gate passed")
elif status == "close":
    print("Quality gate not enabled")
else:
    print(f"Quality gate failed: {description}")

7. Get Code Inspection Issue Data

GET /server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/codelint/issues/

Get Issue List Data API:

import requests
import json

url = f"{base_url}/server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/codelint/issues/"
headers = get_headers(user_id, token)

resp = requests.get(url, headers=headers)
result = resp.json()
print(json.dumps(result, indent=2, ensure_ascii=False))

Script Reference

Environment Variables

When executing the script, environment variables can be injected to dynamically adjust parameters and achieve automated integration.

# TCA platform service address
TCA_BASE_URL=https://tca.tencent.com

# API access token
TCA_USER_ID=your_user_id
TCA_TOKEN=your_token

# Unique identifier of the team
TCA_ORG_SID=your_org_sid

# Unique identifier of the project group
TCA_TEAM_NAME=your_team_name

# Code repository URL
TCA_REPO_URL=your_repo_url
# Code repository credential ID
TCA_SCM_ACCOUNT_ID=your_scm_account_id

# Analysis scheme ID (can use team/project analysis scheme, note permission issues)
TCA_SCHEME_ID=your_scheme_id

# Branch name
TCA_REPO_BRANCH=your_repo_branch

# Whether to perform incremental scan
TCA_INCR_SCAN=true

# Whether to ignore issues in specified code repository branches
TCA_IGNORE_MERGED_ISSUE=true
# Target branch for merge request scenarios
TCA_IGNORE_BRANCH_ISSUE=your_ignore_branch

Script Content

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Start code analysis via API
"""

import os
import sys
import json
import requests
import logging
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from time import time, sleep
from hashlib import sha256

# Log configuration
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler('tca_ci_pipeline_saas.log')
    ]
)
logger = logging.getLogger(__name__)

# OpenAPI path mapping
TCA_OPEN_APIS = {
    "project_team_list": "%s/server/main/api/orgs/{org_sid}/teams/",
    "pt_repo_list": "%s/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/",
    "project_list": "%s/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/",
    "project_scan_list": "%s/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scans/create/",
    "job_detail": "%s/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/jobs/{job_id}/detail/",
    "project_analysis_scan_list": "%s/server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scaninfos/",
    "project_issue_list": "%s/server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/codelint/issues/",
}


@dataclass
class TCAConfig:
    """TCA Configuration Class"""
    base_url: str
    tca_token: str
    user_id: str
    org_sid: str
    team_name: str
    repo_url: str
    repo_branch: str
    repo_mr_branch: str
    scm_account_id: int
    scheme_id: int
    incr_scan: bool
    ignore_merged_issue: bool
    ignore_branch_issue: str
    timeout: int = 30
    max_retries: int = 3


class TCAIntegrationError(Exception):
    """TCA Integration Exception"""
    pass


class TCAClient:
    """TCA API Client"""

    def __init__(self, config: TCAConfig):
        self.config = config
        self.session = requests.Session()

    def get_headers(self, user_id: Optional[str] = None, token: Optional[str] = None):
        timestamp = int(time())
        user_id = user_id or self.config.user_id
        token = token or self.config.tca_token
        token_sig = "%s%s#%s#%s%s" % (timestamp, user_id, token, user_id, timestamp)
        ticket = sha256(token_sig.encode("utf-8")).hexdigest().upper()
        return {
            "TCA-USERID": user_id,
            "TCA-TIMESTAMP": str(timestamp),
            "TCA-TICKET": ticket
        }

    def _is_success_status(self, status_code: int) -> bool:
        """Determine if HTTP status code indicates success (2xx)"""
        return 200 <= status_code < 300

    def _make_request(self, method: str, url: str, data: Optional[Dict] = None,
                      params: Optional[Dict] = None) -> Dict:
        """Send API request (url must be a full path)"""

        for attempt in range(self.config.max_retries):
            try:
                headers = self.get_headers()
                response = self.session.request(
                    method=method,
                    url=url,
                    headers=headers,
                    json=data,
                    params=params,
                    timeout=self.config.timeout
                )

                if self._is_success_status(response.status_code):
                    return response.json()
                elif response.status_code == 401:
                    raise TCAIntegrationError("Invalid or expired TCA Token")
                elif response.status_code == 403:
                    raise TCAIntegrationError("Insufficient permissions, please check team ID and permission settings")
                elif response.status_code == 404:
                    raise TCAIntegrationError("Requested resource does not exist")
                elif response.status_code >= 500:
                    logger.warning(f"Server error, retrying ({attempt + 1}/{self.config.max_retries})...")
                    if attempt == self.config.max_retries - 1:
                        raise TCAIntegrationError("Internal server error, please try again later")
                    sleep(2 ** attempt)  # Exponential backoff
                else:
                    raise TCAIntegrationError(f"API request failed: {response.status_code} - {response.text}")

            except requests.exceptions.Timeout:
                logger.warning(f"Request timed out, retrying ({attempt + 1}/{self.config.max_retries})...")
                if attempt == self.config.max_retries - 1:
                    raise TCAIntegrationError("Request timed out, please check network connection")
            except requests.exceptions.ConnectionError:
                logger.warning(f"Connection error, retrying ({attempt + 1}/{self.config.max_retries})...")
                if attempt == self.config.max_retries - 1:
                    raise TCAIntegrationError("Network connection error, please check network settings")

    def _build_url(self, key: str, **kwargs: Any) -> str:
        base = self.config.base_url.rstrip('/')
        tpl = TCA_OPEN_APIS[key] % base
        return tpl.format(**kwargs)

    def get_project_teams(self) -> Dict:
        url = self._build_url("project_team_list", org_sid=self.config.org_sid)
        return self._make_request("GET", url)

    def get_pt_repos(self) -> Dict:
        url = self._build_url("pt_repo_list", org_sid=self.config.org_sid, team_name=self.config.team_name)
        return self._make_request("GET", url)

    def get_pt_repo_id(self, repo_url: Optional[str] = None) -> int:
        """Get the ID of the corresponding repository under the team based on the repository URL (exact match only).
        Precisely compares `scm_url`, raises TCAIntegrationError if not found.
        """
        result = self.get_pt_repos()
        data = result.get("data") or {}
        results = data.get("results") or []
        repo_url = repo_url or self.config.repo_url
        if not isinstance(results, list):
            raise TCAIntegrationError("Failed to get team repository list, incorrect return format")
        for item in results:
            if not isinstance(item, dict):
                continue
            if repo_url == item.get("scm_url") or repo_url == item.get("format_url"):
                repo_id_val = item.get("id")
                if repo_id_val is not None:
                    return int(repo_id_val)
        raise TCAIntegrationError(f"Repository {repo_url} not found under project group {self.config.team_name}")

    def create_project_team(self) -> Dict:
        """Create a project group"""
        logger.info(f"Creating project group: {self.config.team_name}")
        url = self._build_url("project_team_list", org_sid=self.config.org_sid)
        post_data = {
            'name': self.config.team_name,
            'display_name': self.config.team_name
        }
        result = self._make_request("POST", url, data=post_data)
        return result

    def register_pt_repo(self) -> Dict:
        """Register a code repository (at the project group level)"""
        logger.info(f"Project group: {self.config.team_name}, creating code repository: {self.config.repo_url}")
        url = self._build_url("pt_repo_list", org_sid=self.config.org_sid, team_name=self.config.team_name)
        post_data = {
            'scm_url': self.config.repo_url,
            'scm_type': "git",
            'scm_auth': {
                "auth_type": "password",
                "scm_account": self.config.scm_account_id
            },
            'created_from': "api"
        }
        result = self._make_request("POST", url, data=post_data)
        return result

    def create_project(self) -> Dict:
        """Create an analysis project"""
        logger.info(f"Project group: {self.config.team_name}, creating analysis project for code repository: {self.config.repo_url}")
        repo_id = self.get_pt_repo_id(self.config.repo_url)
        url = self._build_url("project_list", org_sid=self.config.org_sid, team_name=self.config.team_name,
                              repo_id=repo_id)
        post_data = {
            'branch': self.config.repo_branch,
            'global_scheme_id': self.config.scheme_id,
            'use_scheme_template': True,
            'created_from': "api"
        }
        result = self._make_request("POST", url, data=post_data)

        return result

    def get_projects(self) -> Dict:
        repo_id = self.get_pt_repo_id(self.config.repo_url)
        url = self._build_url("project_list", org_sid=self.config.org_sid, team_name=self.config.team_name,
                              repo_id=repo_id)
        return self._make_request("GET", url)

    def get_project_id(self, scheme_id: Optional[int] = None, branch: Optional[str] = None) -> int:
        """Get the analysis project ID based on scan_scheme.id and branch.
        - Precisely matches `scan_scheme.id` and `branch` from the project list
        """
        scheme_id = scheme_id or self.config.scheme_id
        branch = branch or self.config.repo_branch
        logger.info(f"scheme_id={scheme_id}, branch={branch}")
        result = self.get_projects()
        data = result.get("data") or {}
        results = data.get("results") or []
        if not isinstance(results, list):
            raise TCAIntegrationError("Failed to get project list, incorrect return format")
        for item in results:
            if not isinstance(item, dict):
                continue
            item_scheme = (item.get("scan_scheme") or {}).get("id")
            item_branch = item.get("branch")
            if int(item_scheme) == int(scheme_id) and item_branch == branch:
                proj_id = item.get("id")
                if proj_id is not None:
                    return int(proj_id)

        raise TCAIntegrationError(f"No matching analysis project found, scheme_id={scheme_id}, branch={branch}")

    def start_scan(self) -> Dict:
        """Start a scan task"""
        repo_id = self.get_pt_repo_id()
        project_id = self.get_project_id()
        url = self._build_url("project_scan_list", org_sid=self.config.org_sid,
                              team_name=self.config.team_name, repo_id=repo_id, project_id=project_id)
        incr_scan = self.config.incr_scan
        ignore_branch_issue = self.config.ignore_branch_issue
        ignore_merged_issue = self.config.ignore_merged_issue

        post_data = {
            'incr_scan': incr_scan,
            'ignore_branch_issue': ignore_branch_issue,
            'ignore_merged_issue': ignore_merged_issue
        }
        result = self._make_request("POST", url, data=post_data)
        return result

    def wait_for_scan_completion(self, job_id: int) -> Dict:
        """Wait for the analysis task status to become 2 (completed)"""
        repo_id = self.get_pt_repo_id()
        project_id = self.get_project_id()
        url = self._build_url("job_detail", org_sid=self.config.org_sid,
                              team_name=self.config.team_name, repo_id=repo_id, project_id=project_id, job_id=job_id)
        max_wait_time = 600  # Maximum wait time 10 minutes (600 seconds, customizable)
        interval = 30  # Check every 30 seconds
        start_time = int(time())
        while True:
            results = self._make_request("GET", url)
            # Get data from results
            data = results.get("data", {})
            if data:
                state = data.get("state")
                if state == 2:
                    logging.info("Scan results are available, stopping wait")
                    break  # Exit loop as condition is met
            elapsed = int(time()) - start_time
            if elapsed > max_wait_time:
                logger.info(f"Scan exceeded {max_wait_time} seconds, status unchanged, scan failed")
                raise TCAIntegrationError(f"Scan exceeded {max_wait_time} seconds, status unchanged, scan failed")

            logging.info(
                f"Code is being scanned (current state: {data.get('state') if data else 'no data'}), retrying in 30 seconds...")
            sleep(interval)

        return results

    def get_scan_info(self) -> Dict:
        """Get the latest analysis results information"""
        repo_id = self.get_pt_repo_id()
        project_id = self.get_project_id()
        url = self._build_url("project_analysis_scan_list", org_sid=self.config.org_sid,
                              team_name=self.config.team_name, repo_id=repo_id, project_id=project_id)
        response = self._make_request("GET", url)
        results = response.get("data", {}).get("results", [])
        return results[0]

    def get_issue_list(self) -> Dict:
        repo_id = self.get_pt_repo_id()
        project_id = self.get_project_id()
        url = self._build_url("project_issue_list", org_sid=self.config.org_sid,
                              team_name=self.config.team_name, repo_id=repo_id, project_id=project_id)
        result = self._make_request("GET", url)
        return result


class TCAPipeline:
    """TCA Pipeline Integration Class"""

    def __init__(self, config: TCAConfig):
        self.client = TCAClient(config)
        self.config = config

    def run_full_analysis(self) -> bool:
        """Run the full analysis process"""

        # 1. Create project group (or reuse existing project_team)
        exist_project_teams = self.client.get_project_teams()
        if not self.has_team_name(exist_project_teams, self.config.team_name):
            project_team = self.client.create_project_team()
        # 2. Register code repository (or reuse existing repo)
        try:
            self.client.get_pt_repo_id(self.config.repo_url)
        except TCAIntegrationError as e:
            logger.info(f"Existing code repository not found: {self.config.repo_url}, creating new repository")
            self.client.register_pt_repo()

        # 3. Create project under this repo (or reuse existing project)
        try:
            self.client.get_project_id()
        except TCAIntegrationError as e:
            logger.info(f"Existing project not found: {self.config.repo_url}, creating new project")
            self.client.create_project()

        # 4. Start scan
        start_scan_result = self.client.start_scan()

        job_id = self.get_job_id(start_scan_result)

        # 5. Wait for completion
        try:
            scan_result = self.client.wait_for_scan_completion(job_id)
        except TCAIntegrationError as e:
            logger.error(f"Scan failed: {e}")
            return False
        # 5. If no code changes, incremental scan will show result_code=1, indicating no scan needed
        result_code, result_code_msg, result_msg = self.get_result_msg(scan_result)
        if result_code == 1:
            logger.info(
                f"No code changes, result_code: {result_code}, result_code_msg: {result_code_msg}, result_msg: {result_msg} - scan info is from the last scan")
        elif result_code == 0:
            logger.info(
                f"Code changes detected, result_code: {result_code}, result_code_msg: {result_code_msg}, result_msg: {result_msg} - scan info is from the latest scan")
        elif result_code >= 100:
            logger.error(f"Scan failed: {result_code}, {result_code_msg}, {result_msg}")
            return False
        scan_info = self.client.get_scan_info()
        # 5. Get code scan results
        lint_scan_result = self.get_lint_scan_result(scan_info)
        logger.info(f"Code scan results: {lint_scan_result}")

        # 6. Get cyclomatic complexity results
        cyclomatic_complexity_result = self.get_cyclomatic_complexity_result(scan_info)
        logger.info(f"Cyclomatic complexity results: {cyclomatic_complexity_result}")

        # 7. Get duplicate code results
        duplicate_code_result = self.get_duplicate_code_result(scan_info)
        logger.info(f"Duplicate code results: {duplicate_code_result}")

        # 8. Get code statistics results
        get_cloc_result = self.get_cloc_result(scan_info)
        logger.info(f"Code statistics results: {get_cloc_result}")

        # 9. Get issue list
        issues = self.client.get_issue_list()
        logger.info(f"Issue list: {issues}")

        # 10. Get quality gate results
        quality_gate_result = self.get_quality_gate_result(scan_info)
        logger.info(f"Quality gate results: {quality_gate_result}")
        if quality_gate_result.get("status") == "success" or quality_gate_result.get("status") == "close":
            return True
        else:
            return False

    def get_job_id(self, result: dict) -> int:
        try:
            job_id = result["data"]["job"]["id"]
        except (KeyError, TypeError):
            raise TCAIntegrationError("Incorrect response format, data.job.id not found")
        return job_id

    def get_result_msg(self, result: dict) -> Tuple[int, str, str]:
        try:
            result_code = result["data"]["result_code"]
            result_code_msg = result["data"]["result_code_msg"]
            result_msg = result["data"]["result_msg"]
        except (KeyError, TypeError):
            raise TCAIntegrationError("Incorrect response format, data.result_code information not found")
        return result_code, result_code_msg, result_msg

    def get_lint_scan_result(self, results: dict) -> dict:
        """Get code scan results (lintscan)"""
        lint = results.get("lintscan") or {}
        return {
            "issue_open_num": lint.get("issue_open_num"),
            "issue_fix_num": lint.get("issue_fix_num"),
            "issue_detail_num": lint.get("issue_detail_num"),
        }

    def get_cyclomatic_complexity_result(self, results: dict) -> dict:
        """Get cyclomatic complexity results (cyclomaticcomplexityscan)"""
        cc = results.get("cyclomaticcomplexityscan") or {}
        return {
            "diff_cc_num": cc.get("diff_cc_num"),
            "cc_open_num": cc.get("cc_open_num"),
        }

    def get_duplicate_code_result(self, results: dict) -> dict:
        """Get duplicate code results (duplicatescan)"""
        dup = results.get("duplicatescan") or {}
        return {
            "duplicate_block_count": dup.get("duplicate_block_count"),
            "total_duplicate_line_count": dup.get("total_duplicate_line_count"),
        }

    def get_cloc_result(self, results: dict) -> dict:
        """Get code statistics results (clocscan)"""
        cloc = results.get("clocscan") or {}
        return {
            "code_line_num": cloc.get("code_line_num"),
            "comment_line_num": cloc.get("comment_line_num"),
            "blank_line_num": cloc.get("blank_line_num"),
            "total_line_num": cloc.get("total_line_num"),
        }

    def get_quality_gate_result(self, results: dict) -> dict:
        """Get quality gate status (qualityscan)"""
        quality = results.get("qualityscan") or {}
        return {
            "status": quality.get("status"),
            "description": quality.get("description"),
        }

    def has_team_name(self, response: dict, team_name: str) -> bool:
        """
        Determine if there is a project group with name == team_name in the returned project_team data.

        :param response: Complete dictionary data returned by the interface
        :return: True if a team named team_name exists, otherwise False
        """
        try:
            teams = response.get("data", [])
            for team in teams:
                if team.get("name") == team_name:
                    return True
            return False
        except Exception as e:
            TCAIntegrationError(f"Error parsing team data: {e}")
            return False


def _manual_load_dotenv(candidates: List[str]) -> int:
    """Simple .env parser: Manually load KEY=VALUE into os.environ if python-dotenv is not installed.
  Only overrides variables that are not already set. Returns the number of successfully loaded variables."""
    loaded = 0
    for path in candidates:
        if not path:
            continue
        if os.path.exists(path) and os.path.isfile(path):
            try:
                with open(path, 'r', encoding='utf-8') as f:
                    for raw_line in f:
                        line = raw_line.strip()
                        if not line or line.startswith('#'):
                            continue
                        if '=' not in line:
                            continue
                        key, val = line.split('=', 1)
                        key = key.strip()
                        val = val.strip().strip('"').strip("'")
                        if key and key not in os.environ:
                            os.environ[key] = val
                            loaded += 1
                logger.info(f"Loaded {loaded} variables from .env: {path}")
            except Exception as e:
                logger.warning(f"Failed to read .env: {path} - {e}")
    return loaded


def str_to_bool(value: str) -> bool:
    """Convert a string to a boolean value (case-insensitive)"""
    return str(value).strip().lower() in ['true', '1', 't', 'y', 'yes']


def load_config_from_env() -> TCAConfig:
    """Load configuration only from environment variables and .env files (no longer reads config.py).
    Priority: Process environment variables > .env > Default values
    Supports two .env files:
        - .env in the same directory as the script
        - .env in the current working directory
    """
    # 1) Prioritize python-dotenv
    dotenv_loaded = False
    try:
        from dotenv import load_dotenv  # type: ignore
        from pathlib import Path
        dotenv_paths = [
            Path(__file__).with_name('.env'),
            Path.cwd() / '.env'
        ]
        for p in dotenv_paths:
            if p.exists():
                load_dotenv(dotenv_path=str(p), override=False)
                dotenv_loaded = True
                logger.info(f"Loaded .env using python-dotenv: {p}")
                break
    except Exception:
        pass

    # 2) If not loaded via python-dotenv, manually load
    if not dotenv_loaded:
        script_env = os.path.join(os.path.dirname(__file__), '.env')
        cwd_env = os.path.join(os.getcwd(), '.env')
        _manual_load_dotenv([script_env, cwd_env])

    # 3) Read environment variables
    base_url = os.getenv('TCA_BASE_URL', 'https://tca.tencent.com')
    tca_token = os.getenv('TCA_TOKEN', '')
    user_id = os.getenv('TCA_USER_ID', '')
    org_sid = os.getenv('TCA_ORG_SID', '')
    team_name = os.getenv('TCA_TEAM_NAME', '')
    repo_url = os.getenv('TCA_REPO_URL', '')
    repo_branch = os.getenv('TCA_REPO_BRANCH', '')
    repo_mr_branch = os.getenv('TCA_REPO_MR_BRANCH', '')
    scm_account_id = os.getenv('TCA_SCM_ACCOUNT_ID', '')
    scheme_id = os.getenv('TCA_SCHEME_ID', '')
    incr_scan = os.getenv('TCA_INCR_SCAN', False)
    ignore_branch_issue = os.getenv('TCA_IGNORE_BRANCH_ISSUE', '')
    ignore_merged_issue = os.getenv('TCA_IGNORE_MERGED_ISSUE', False)
    return TCAConfig(
        base_url=base_url,
        tca_token=tca_token,
        user_id=user_id,
        org_sid=org_sid,
        team_name=team_name,
        repo_url=repo_url,
        scm_account_id=int(scm_account_id),
        repo_branch=repo_branch,
        repo_mr_branch=repo_mr_branch,
        scheme_id=int(scheme_id),
        incr_scan=str_to_bool(incr_scan),
        ignore_branch_issue=ignore_branch_issue,
        ignore_merged_issue=str_to_bool(ignore_merged_issue),
    )


def main():
    """Main function - Example usage"""
    # Load configuration (priority: environment variables > config.py > default values)
    config = load_config_from_env()
    logger.info(
        f"Loaded configuration: base_url={config.base_url}, "
        f"org_sid={'***' if config.org_sid else ''}, "
        f"tca_token={'***' if config.tca_token else ''}, "
        f"user_id={'***' if config.user_id else ''}, "
        f"team_name={config.team_name}, "
        f"repo_url={config.repo_url}, "
        f"repo_branch={config.repo_branch}, "
        f"repo_mr_branch={config.repo_mr_branch}, "
        f"scm_account_id={config.scm_account_id}, "
        f"scheme_id={config.scheme_id}, "
        f"incr_scan={config.incr_scan}, "
        f"ignore_branch_issue={config.ignore_branch_issue}, "
        f"ignore_merged_issue={config.ignore_merged_issue}"
    )
    # Validate required configurations
    if (not config.org_sid \
            or not config.tca_token \
            or not config.user_id \
            or not config.team_name \
            or not config.repo_url \
            or not config.repo_branch \
            or not config.scm_account_id \
            or not config.scheme_id):
        logger.error(
            "Missing required configurations, please set TCA_USER_ID, TCA_TOKEN, TCA_ORG_SID, TCA_TEAM_NAME, TCA_REPO_URL, TCA_REPO_BRANCH, TCA_SCM_ACCOUNT_ID, TCA_SCHEME_ID")
        sys.exit(1)

    # Create pipeline instance
    pipeline = TCAPipeline(config)

    scan_result = pipeline.run_full_analysis()

    if scan_result:
        logger.info("Pipeline executed successfully")
        sys.exit(0)
    else:
        logger.error("Pipeline execution failed")
        sys.exit(1)


if __name__ == "__main__":
    main()
Last Updated:: 12/9/25, 2:27 PM
Contributors: nickctang, bruccezhang, faberihe
Prev
Intranet Repository Analysis
Next
Optimizing Analysis Speed