API 触发分析
本文档旨在指引如何通过 TCA API 的形式触发执行分析。
应用场景
可封装设计成 API 脚本,根据需求调用 API 触发分析。
可封装设计成 CI 插件,集成到 CI 流水线中触发分析。
触发代码分析指南
整体流程
通过调用 API 的形式,将代码库登记到 TCA 平台内,根据分析方案等配置创建分析项目,并触发代码分析,由 TCA 调度任务到节点机器上执行代码分析,通过轮询的方式等待获取分析任务结果,最终根据结果判断分析任务是否执行成功/失败,可以根据门禁信息判断门禁是否通过,以及获取问题列表等。
层级说明
团队 -> 项目 -> 代码库 -> 分析项目(分支 + 分析路径 + 分析方案组成唯一分析项目)。
创建项目组。若存在可直接使用,无需创建。
登记代码库。若存在可直接使用,无需登记。
创建分析项目。若存在可直接使用,无需创建。
启动分析任务。
轮询任务状态,判断任务执行成功/失败。
获取分析结果,门禁信息等。
获取代码检查问题数据。
前置步骤
获取 API 访问令牌:API 接口鉴权所需,
user_id、token。
获取代码库凭证 ID:登记代码库所需,用于执行分析时拉取代码,
scm_account_id,如不存在则需要创建凭证。
获取分析方案 ID:创建分析项目所需,可进入分析方案-基础配置页面,或 URL 链接获中获取
scheme_id,如不存在则根据需要创建团队/项目分析方案。接入在线节点:执行代码分析所需,API 触发分析本质上任务是在 TCA 线上节点上执行的,需要保证 TCA 存在可用的在线节点,否则会导致分析无可用节点而失败。
API 调用流程
API 接口鉴权参见文档:API 接口鉴权
1. 创建项目组
建议先获取是否存在项目组,如果不存在,则创建项目组,如果存在,则直接使用。
GET|POST /server/main/api/orgs/{org_sid}/teams/
import requests url = f"{base_url}/server/main/api/orgs/{org_sid}/teams/" post_data = { "name": team_name, "display_name": team_name } headers = get_headers(user_id, token) resp = requests.post(url, json=post_data, headers=headers) result = resp.json() print(result)
2. 登记代码库
建议先获取是否存在对应代码库,如果不存在,则创建登记代码库,如果存在,则直接使用。
GET|POST /server/main/api/orgs/{org_sid}/teams/{team_name}/repos/
import requests url = f"{base_url}/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/" post_data = { "scm_url": repo_url, "scm_type": "git", "scm_auth": { "auth_type": "password", "scm_account": scm_account_id }, "created_from": "api" } headers = get_headers(user_id, token) resp = requests.post(url, json=post_data, headers=headers) result = resp.json() print(result)
3. 创建分析项目
建议先获取是否存在对应分析项目,如果不存在,则创建分析项目,如果存在,则直接使用。
GET|POST /server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/
import requests url = f"{base_url}/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/" post_data = { "branch": repo_branch, "global_scheme_id": scheme_id, "use_scheme_template": True, "created_from": "api" } headers = get_headers(user_id, token) resp = requests.post(url, json=post_data, headers=headers) result = resp.json() print(result)
4. 启动分析任务
POST /server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scans/create/
普通触发
post_data = { "incr_scan": True, # 是否增量扫描 False是全量, True是增量 }合并请求触发
post_data = { "incr_scan": True, # 是否增量扫描 False是全量, True是增量 "ignore_branch_issue": "target_branch", # 对比分支 "ignore_merged_issue": True }
import requests
url = f"{base_url}/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scans/create/"
headers = get_headers(user_id, token)
resp = requests.post(url, json=post_data, headers=headers)
result = resp.json()
print(result)
5. 轮询任务状态
GET /server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/jobs/{job_id}/detail/
获取任务详情 API:轮询扫描状态直到 state == 2:
6. 获取分析结果
扫描完成后,接口会返回完整的扫描结果对象 results[0],包含所有维度的扫描数据。
GET /server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scaninfos/
import requests
url = f"{base_url}/server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scaninfos/"
headers = get_headers(user_id, token)
resp = requests.get(url, headers=headers)
result = resp.json()
results = result.get("data", {}).get("results", [])
print(result[0])
6.1. 代码检查结果(lintscan)
lint = result[0].get("lintscan", {})
print({
"issue_open_num": lint.get("issue_open_num"),
"issue_fix_num": lint.get("issue_fix_num"),
"issue_detail_num": lint.get("issue_detail_num"),
})
6.2. 圈复杂度结果(cyclomaticcomplexityscan)
cc = result[0].get("cyclomaticcomplexityscan", {})
print({
"diff_cc_num": cc.get("diff_cc_num"),
"cc_open_num": cc.get("cc_open_num"),
})
6.3. 重复代码结果(duplicatescan)
dup = result[0].get("duplicatescan", {})
print({
"duplicate_block_count": dup.get("duplicate_block_count"),
"total_duplicate_line_count": dup.get("total_duplicate_line_count"),
})
6.4. 代码统计结果(clocscan)
cloc = result.get("clocscan", {})
print({
"code_line_num": cloc.get("code_line_num"),
"comment_line_num": cloc.get("comment_line_num"),
"blank_line_num": cloc.get("blank_line_num"),
"total_line_num": cloc.get("total_line_num"),
})
6.6. 获取质量门禁状态(Quality Gate)
quality = result[0].get("qualityscan", {})
status = quality.get("status") # "success": 门禁通过,"failure": 门禁未通过, "close": 门禁未开启
description = quality.get("description")
if status == "success":
print("质量门禁通过")
elif status == "close":
print("质量门禁未开启")
else:
print(f"质量门禁未通过:{description}")
7. 获取代码检查问题数据
GET /server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/codelint/issues/
import requests
import json
url = f"{base_url}/server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/codelint/issues/"
headers = get_headers(user_id, token)
resp = requests.get(url, headers=headers)
result = resp.json()
print(json.dumps(result, indent=2, ensure_ascii=False))
脚本参考
环境变量
执行脚本时,可以注入环境变量,动态调整参数,实现自动化集成。
# TCA 平台服务地址
TCA_BASE_URL=https://tca.tencent.com
# API 访问令牌
TCA_USER_ID=your_user_id
TCA_TOKEN=your_token
# 团队唯一标识
TCA_ORG_SID=your_org_sid
# 项目组唯一标识
TCA_TEAM_NAME=your_team_name
# 代码库 URL
TCA_REPO_URL=your_repo_url
# 代码库凭证 ID
TCA_SCM_ACCOUNT_ID=your_scm_account_id
# 分析方案 ID,可使用团队/项目分析方案,注意权限问题
TCA_SCHEME_ID=your_scheme_id
# 分支名称
TCA_REPO_BRANCH=your_repo_branch
# 是否增量扫描
TCA_INCR_SCAN=true
# 是否忽略指定代码库分支问题
TCA_IGNORE_MERGED_ISSUE=true
# 合并请求场景下的对比分支
TCA_IGNORE_BRANCH_ISSUE=your_ignore_branch
脚本内容
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
通过 API 启动代码分析
"""
import os
import sys
import json
import requests
import logging
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from time import time, sleep
from hashlib import sha256
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('tca_ci_pipeline_saas.log')
]
)
logger = logging.getLogger(__name__)
# 开放API路径映射
TCA_OPEN_APIS = {
"project_team_list": "%s/server/main/api/orgs/{org_sid}/teams/",
"pt_repo_list": "%s/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/",
"project_list": "%s/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/",
"project_scan_list": "%s/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scans/create/",
"job_detail": "%s/server/main/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/jobs/{job_id}/detail/",
"project_analysis_scan_list": "%s/server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/scaninfos/",
"project_issue_list": "%s/server/analysis/api/orgs/{org_sid}/teams/{team_name}/repos/{repo_id}/projects/{project_id}/codelint/issues/",
}
@dataclass
class TCAConfig:
"""TCA配置类"""
base_url: str
tca_token: str
user_id: str
org_sid: str
team_name: str
repo_url: str
repo_branch: str
repo_mr_branch: str
scm_account_id: int
scheme_id: int
incr_scan: bool
ignore_merged_issue: bool
ignore_branch_issue: str
timeout: int = 30
max_retries: int = 3
class TCAIntegrationError(Exception):
"""TCA集成异常"""
pass
class TCAClient:
"""TCA API客户端"""
def __init__(self, config: TCAConfig):
self.config = config
self.session = requests.Session()
def get_headers(self, user_id: Optional[str] = None, token: Optional[str] = None):
timestamp = int(time())
user_id = user_id or self.config.user_id
token = token or self.config.tca_token
token_sig = "%s%s#%s#%s%s" % (timestamp, user_id, token, user_id, timestamp)
ticket = sha256(token_sig.encode("utf-8")).hexdigest().upper()
return {
"TCA-USERID": user_id,
"TCA-TIMESTAMP": str(timestamp),
"TCA-TICKET": ticket
}
def _is_success_status(self, status_code: int) -> bool:
"""判断HTTP状态码是否表示成功(2xx)"""
return 200 <= status_code < 300
def _make_request(self, method: str, url: str, data: Optional[Dict] = None,
params: Optional[Dict] = None) -> Dict:
"""发送API请求,url需为完整路径"""
for attempt in range(self.config.max_retries):
try:
headers = self.get_headers()
response = self.session.request(
method=method,
url=url,
headers=headers,
json=data,
params=params,
timeout=self.config.timeout
)
if self._is_success_status(response.status_code):
return response.json()
elif response.status_code == 401:
raise TCAIntegrationError("TCA Token无效或已过期")
elif response.status_code == 403:
raise TCAIntegrationError("权限不足,请检查团队ID和权限设置")
elif response.status_code == 404:
raise TCAIntegrationError("请求的资源不存在")
elif response.status_code >= 500:
logger.warning(f"服务器错误,第{attempt + 1}次重试...")
if attempt == self.config.max_retries - 1:
raise TCAIntegrationError("服务器内部错误,请稍后重试")
sleep(2 ** attempt) # 指数退避
else:
raise TCAIntegrationError(f"API请求失败: {response.status_code} - {response.text}")
except requests.exceptions.Timeout:
logger.warning(f"请求超时,第{attempt + 1}次重试...")
if attempt == self.config.max_retries - 1:
raise TCAIntegrationError("请求超时,请检查网络连接")
except requests.exceptions.ConnectionError:
logger.warning(f"连接错误,第{attempt + 1}次重试...")
if attempt == self.config.max_retries - 1:
raise TCAIntegrationError("网络连接错误,请检查网络设置")
def _build_url(self, key: str, **kwargs: Any) -> str:
base = self.config.base_url.rstrip('/')
tpl = TCA_OPEN_APIS[key] % base
return tpl.format(**kwargs)
def get_project_teams(self) -> Dict:
url = self._build_url("project_team_list", org_sid=self.config.org_sid)
return self._make_request("GET", url)
def get_pt_repos(self) -> Dict:
url = self._build_url("pt_repo_list", org_sid=self.config.org_sid, team_name=self.config.team_name)
return self._make_request("GET", url)
def get_pt_repo_id(self, repo_url: Optional[str] = None) -> int:
"""根据仓库地址获取团队下对应仓库的ID(仅精确匹配)。
精确比较 `scm_url`,找不到则抛出 TCAIntegrationError。
"""
result = self.get_pt_repos()
data = result.get("data") or {}
results = data.get("results") or []
repo_url = repo_url or self.config.repo_url
if not isinstance(results, list):
raise TCAIntegrationError("获取团队仓库列表失败,返回格式不正确")
for item in results:
if not isinstance(item, dict):
continue
if repo_url == item.get("scm_url") or repo_url == item.get("format_url"):
repo_id_val = item.get("id")
if repo_id_val is not None:
return int(repo_id_val)
raise TCAIntegrationError(f"未在项目组 {self.config.team_name} 下找到仓库: {repo_url}")
def create_project_team(self) -> Dict:
"""创建项目组"""
logger.info(f"创建项目组: {self.config.team_name}")
url = self._build_url("project_team_list", org_sid=self.config.org_sid)
post_data = {
'name': self.config.team_name,
'display_name': self.config.team_name
}
result = self._make_request("POST", url, data=post_data)
return result
def register_pt_repo(self) -> Dict:
"""登记代码库(项目组维度)"""
logger.info(f"项目组: {self.config.team_name}, 创建代码库: {self.config.repo_url}")
url = self._build_url("pt_repo_list", org_sid=self.config.org_sid, team_name=self.config.team_name)
post_data = {
'scm_url': self.config.repo_url,
'scm_type': "git",
'scm_auth': {
"auth_type": "password",
"scm_account": self.config.scm_account_id
},
'created_from': "api"
}
result = self._make_request("POST", url, data=post_data)
return result
def create_project(self) -> Dict:
"""创建分析项目"""
logger.info(f"项目组: {self.config.team_name}, 创建代码库: {self.config.repo_url}的分析项目")
repo_id = self.get_pt_repo_id(self.config.repo_url)
url = self._build_url("project_list", org_sid=self.config.org_sid, team_name=self.config.team_name,
repo_id=repo_id)
post_data = {
'branch': self.config.repo_branch,
'global_scheme_id': self.config.scheme_id,
'use_scheme_template': True,
'created_from': "api"
}
result = self._make_request("POST", url, data=post_data)
return result
def get_projects(self) -> Dict:
repo_id = self.get_pt_repo_id(self.config.repo_url)
url = self._build_url("project_list", org_sid=self.config.org_sid, team_name=self.config.team_name,
repo_id=repo_id)
return self._make_request("GET", url)
def get_project_id(self, scheme_id: Optional[int] = None, branch: Optional[str] = None) -> int:
"""根据 scan_scheme.id 和 branch 获取分析项目ID。
- 从项目列表中精确匹配 `scan_scheme.id` 与 `branch`
"""
scheme_id = scheme_id or self.config.scheme_id
branch = branch or self.config.repo_branch
logger.info(f"scheme_id={scheme_id}, branch={branch}")
result = self.get_projects()
data = result.get("data") or {}
results = data.get("results") or []
if not isinstance(results, list):
raise TCAIntegrationError("获取项目列表失败,返回格式不正确")
for item in results:
if not isinstance(item, dict):
continue
item_scheme = (item.get("scan_scheme") or {}).get("id")
item_branch = item.get("branch")
if int(item_scheme) == int(scheme_id) and item_branch == branch:
proj_id = item.get("id")
if proj_id is not None:
return int(proj_id)
raise TCAIntegrationError(f"未找到匹配的分析项目,scheme_id={scheme_id}, branch={branch}")
def start_scan(self) -> Dict:
"""启动扫描任务"""
repo_id = self.get_pt_repo_id()
project_id = self.get_project_id()
url = self._build_url("project_scan_list", org_sid=self.config.org_sid,
team_name=self.config.team_name, repo_id=repo_id, project_id=project_id)
incr_scan = self.config.incr_scan
ignore_branch_issue = self.config.ignore_branch_issue
ignore_merged_issue = self.config.ignore_merged_issue
post_data = {
'incr_scan': incr_scan,
'ignore_branch_issue': ignore_branch_issue,
'ignore_merged_issue': ignore_merged_issue
}
result = self._make_request("POST", url, data=post_data)
return result
def wait_for_scan_completion(self, job_id: int) -> Dict:
"""等待分析任务状态变为 2(已完成)"""
repo_id = self.get_pt_repo_id()
project_id = self.get_project_id()
url = self._build_url("job_detail", org_sid=self.config.org_sid,
team_name=self.config.team_name, repo_id=repo_id, project_id=project_id, job_id=job_id)
max_wait_time = 600 # 最多等待 10 分钟(600秒,这里可以自定义等待时间)
interval = 30 # 每隔 30秒请求一次
start_time = int(time())
while True:
results = self._make_request("GET", url)
# 获取results中的data结果
data = results.get("data", {})
if data:
state = data.get("state")
if state == 2:
logging.info("扫描结果已出,结束等待")
break # 成功满足条件,退出循环
elapsed = int(time()) - start_time
if elapsed > max_wait_time:
logger.info(f"扫描超过{max_wait_time}秒,扫描状态仍未改变,扫描失败")
raise TCAIntegrationError(f"扫描超过{max_wait_time}秒,扫描状态仍未改变,扫描失败")
logging.info(
f"代码正在扫描中(当前 state: {data.get('state') if data else '无数据'}),等待30秒后重试...")
sleep(interval)
return results
def get_scan_info(self) -> Dict:
"""获取最新分析结果信息"""
repo_id = self.get_pt_repo_id()
project_id = self.get_project_id()
url = self._build_url("project_analysis_scan_list", org_sid=self.config.org_sid,
team_name=self.config.team_name, repo_id=repo_id, project_id=project_id)
response = self._make_request("GET", url)
results = response.get("data", {}).get("results", [])
return results[0]
def get_issue_list(self) -> Dict:
repo_id = self.get_pt_repo_id()
project_id = self.get_project_id()
url = self._build_url("project_issue_list", org_sid=self.config.org_sid,
team_name=self.config.team_name, repo_id=repo_id, project_id=project_id)
result = self._make_request("GET", url)
return result
class TCAPipeline:
"""TCA流水线集成类"""
def __init__(self, config: TCAConfig):
self.client = TCAClient(config)
self.config = config
def run_full_analysis(self) -> bool:
"""运行完整分析流程"""
# 1. 创建项目组(或复用已有 project_team)
exist_project_teams = self.client.get_project_teams()
if not self.has_team_name(exist_project_teams, self.config.team_name):
project_team = self.client.create_project_team()
# 2. 登记代码库(或复用已有 repo)
try:
self.client.get_pt_repo_id(self.config.repo_url)
except TCAIntegrationError as e:
logger.info(f"未找到已有代码库: {self.config.repo_url}, 创建新代码库")
self.client.register_pt_repo()
# 3. 在该 repo 下创建项目(或复用已有 project)
try:
self.client.get_project_id()
except TCAIntegrationError as e:
logger.info(f"未找到已有项目: {self.config.repo_url}, 创建新项目")
self.client.create_project()
# 4. 启动扫描
start_scan_result = self.client.start_scan()
job_id = self.get_job_id(start_scan_result)
# 5. 等待完成
try:
scan_result = self.client.wait_for_scan_completion(job_id)
except TCAIntegrationError as e:
logger.error(f"扫描失败: {e}")
return False
# 5. 如果代码未做修改,触发增量扫描,会显示result_code为1,显示无需扫描
result_code, result_code_msg, result_msg = self.get_result_msg(scan_result)
if result_code == 1:
logger.info(
f"代码未做修改,result_code: {result_code}, result_code_msg: {result_code_msg}, result_msg: {result_msg} 扫描信息为上一次扫描信息")
elif result_code == 0:
logger.info(
f"代码已做修改,result_code: {result_code}, result_code_msg: {result_code_msg}, result_msg: {result_msg} 扫描信息为最新扫描信息")
elif result_code >= 100:
logger.error(f"扫描失败: {result_code}, {result_code_msg}, {result_msg}")
return False
scan_info = self.client.get_scan_info()
# 5. 获取代码扫描结果
lint_scan_result = self.get_lint_scan_result(scan_info)
logger.info(f"代码扫描结果: {lint_scan_result}")
# 6. 获取圈复杂度结果
cyclomatic_complexity_result = self.get_cyclomatic_complexity_result(scan_info)
logger.info(f"圈复杂度结果: {cyclomatic_complexity_result}")
# 7. 获取重复代码结果
duplicate_code_result = self.get_duplicate_code_result(scan_info)
logger.info(f"重复代码结果: {duplicate_code_result}")
# 8. 获取代码统计结果
get_cloc_result = self.get_cloc_result(scan_info)
logger.info(f"代码统计结果: get_cloc_result")
# 9. 获取问题列表
issues = self.client.get_issue_list()
logger.info(f"问题列表: {issues}")
# 10. 获取代码门禁结果
quality_gate_result = self.get_quality_gate_result(scan_info)
logger.info(f"代码门禁结果: {quality_gate_result}")
if quality_gate_result.get("status") == "success" or quality_gate_result.get("status") == "close":
return True
else:
return False
def get_job_id(self, result: dict) -> int:
try:
job_id = result["data"]["job"]["id"]
except (KeyError, TypeError):
raise TCAIntegrationError("响应格式不正确,未找到 data.job.id")
return job_id
def get_result_msg(self, result: dict) -> Tuple[int, str, str]:
try:
result_code = result["data"]["result_code"]
result_code_msg = result["data"]["result_code_msg"]
result_msg = result["data"]["result_msg"]
except (KeyError, TypeError):
raise TCAIntegrationError("响应格式不正确,未找到 data.result_code相关信息")
return result_code, result_code_msg, result_msg
def get_lint_scan_result(self, results: dict) -> dict:
"""获取代码扫描结果(lintscan)"""
lint = results.get("lintscan") or {}
return {
"issue_open_num": lint.get("issue_open_num"),
"issue_fix_num": lint.get("issue_fix_num"),
"issue_detail_num": lint.get("issue_detail_num"),
}
def get_cyclomatic_complexity_result(self, results: dict) -> dict:
"""获取圈复杂度结果(cyclomaticcomplexityscan)"""
cc = results.get("cyclomaticcomplexityscan") or {}
return {
"diff_cc_num": cc.get("diff_cc_num"),
"cc_open_num": cc.get("cc_open_num"),
}
def get_duplicate_code_result(self, results: dict) -> dict:
"""获取重复代码结果(duplicatescan)"""
dup = results.get("duplicatescan") or {}
return {
"duplicate_block_count": dup.get("duplicate_block_count"),
"total_duplicate_line_count": dup.get("total_duplicate_line_count"),
}
def get_cloc_result(self, results: dict) -> dict:
"""获取代码统计结果(clocscan)"""
cloc = results.get("clocscan") or {}
return {
"code_line_num": cloc.get("code_line_num"),
"comment_line_num": cloc.get("comment_line_num"),
"blank_line_num": cloc.get("blank_line_num"),
"total_line_num": cloc.get("total_line_num"),
}
def get_quality_gate_result(self, results: dict) -> dict:
"""获取质量门禁状态(qualityscan)"""
quality = results.get("qualityscan") or {}
return {
"status": quality.get("status"),
"description": quality.get("description"),
}
def has_team_name(self, response: dict, team_name: str) -> bool:
"""
判断接口返回的 project_team 数据中是否存在 name == team_name 的项目组
:param response: 接口返回的完整字典数据
:return: True 表示存在名为 team_name 的团队,否则 False
"""
try:
teams = response.get("data", [])
for team in teams:
if team.get("name") == team_name:
return True
return False
except Exception as e:
TCAIntegrationError(f"解析 team 数据时出错: {e}")
return False
def _manual_load_dotenv(candidates: List[str]) -> int:
"""简易 .env 解析器:在未安装 python-dotenv 时,手动加载 KEY=VALUE 到 os.environ。
仅在变量未设置时覆盖。返回成功加载的变量数量。"""
loaded = 0
for path in candidates:
if not path:
continue
if os.path.exists(path) and os.path.isfile(path):
try:
with open(path, 'r', encoding='utf-8') as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith('#'):
continue
if '=' not in line:
continue
key, val = line.split('=', 1)
key = key.strip()
val = val.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = val
loaded += 1
logger.info(f"已从 .env 加载 {loaded} 个变量: {path}")
except Exception as e:
logger.warning(f"读取 .env 失败: {path} - {e}")
return loaded
def str_to_bool(value: str) -> bool:
"""将字符串转换为布尔值(忽略大小写)"""
return str(value).strip().lower() in ['true', '1', 't', 'y', 'yes']
def load_config_from_env() -> TCAConfig:
"""仅从环境变量与 .env 文件加载配置(不再读取 config.py)。
优先级:进程环境变量 > .env > 默认值
支持两处 .env:
- 与本脚本同目录的 .env
- 当前工作目录的 .env
"""
# 1) 优先尝试 python-dotenv
dotenv_loaded = False
try:
from dotenv import load_dotenv # type: ignore
from pathlib import Path
dotenv_paths = [
Path(__file__).with_name('.env'),
Path.cwd() / '.env'
]
for p in dotenv_paths:
if p.exists():
load_dotenv(dotenv_path=str(p), override=False)
dotenv_loaded = True
logger.info(f"已使用 python-dotenv 加载 .env: {p}")
break
except Exception:
pass
# 2) 若未通过 python-dotenv 加载,则手动加载
if not dotenv_loaded:
script_env = os.path.join(os.path.dirname(__file__), '.env')
cwd_env = os.path.join(os.getcwd(), '.env')
_manual_load_dotenv([script_env, cwd_env])
# 3) 读取环境变量
base_url = os.getenv('TCA_BASE_URL', 'https://tca.tencent.com')
tca_token = os.getenv('TCA_TOKEN', '')
user_id = os.getenv('TCA_USER_ID', '')
org_sid = os.getenv('TCA_ORG_SID', '')
team_name = os.getenv('TCA_TEAM_NAME', '')
repo_url = os.getenv('TCA_REPO_URL', '')
repo_branch = os.getenv('TCA_REPO_BRANCH', '')
repo_mr_branch = os.getenv('TCA_REPO_MR_BRANCH', '')
scm_account_id = os.getenv('TCA_SCM_ACCOUNT_ID', '')
scheme_id = os.getenv('TCA_SCHEME_ID', '')
incr_scan = os.getenv('TCA_INCR_SCAN', False)
ignore_branch_issue = os.getenv('TCA_IGNORE_BRANCH_ISSUE', '')
ignore_merged_issue = os.getenv('TCA_IGNORE_MERGED_ISSUE', False)
return TCAConfig(
base_url=base_url,
tca_token=tca_token,
user_id=user_id,
org_sid=org_sid,
team_name=team_name,
repo_url=repo_url,
scm_account_id=int(scm_account_id),
repo_branch=repo_branch,
repo_mr_branch=repo_mr_branch,
scheme_id=int(scheme_id),
incr_scan=str_to_bool(incr_scan),
ignore_branch_issue=ignore_branch_issue,
ignore_merged_issue=str_to_bool(ignore_merged_issue),
)
def main():
"""主函数 - 示例用法"""
# 加载配置(优先级:环境变量 > config.py > 默认值)
config = load_config_from_env()
logger.info(
f"已加载配置: base_url={config.base_url}, "
f"org_sid={'***' if config.org_sid else ''}, "
f"tca_token={'***' if config.tca_token else ''}, "
f"user_id={'***' if config.user_id else ''}, "
f"team_name={config.team_name}, "
f"repo_url={config.repo_url}, "
f"repo_branch={config.repo_branch}, "
f"repo_mr_branch={config.repo_mr_branch}, "
f"scm_account_id={config.scm_account_id}, "
f"scheme_id={config.scheme_id}, "
f"incr_scan={config.incr_scan}, "
f"ignore_branch_issue={config.ignore_branch_issue}, "
f"ignore_merged_issue={config.ignore_merged_issue}"
)
# 验证必要配置
if (not config.org_sid \
or not config.tca_token \
or not config.user_id \
or not config.team_name \
or not config.repo_url \
or not config.repo_branch \
or not config.scm_account_id \
or not config.scheme_id):
logger.error(
"缺少必要配置,请设置 TCA_USER_ID、TCA_TOKEN、TCA_ORG_SID、TCA_TEAM_NAME、TCA_REPO_URL、TCA_REPO_BRANCH、TCA_SCM_ACCOUNT_ID、TCA_SCHEME_ID")
sys.exit(1)
# 创建流水线实例
pipeline = TCAPipeline(config)
scan_result = pipeline.run_full_analysis()
if scan_result:
logger.info("流水线执行成功")
sys.exit(0)
else:
logger.error("流水线执行失败")
sys.exit(1)
if __name__ == "__main__":
main()