bitbucketRestApi结合python和RPA获取报表

Bitbucket 是 Atlassian 提供的代码托管平台,支持 Git 和 Mercurial 仓库,提供团队协作、Pull Request、分支权限管理等功能,并与 Jira、Confluence 等工具深度集成,适合团队进行版本控制、代码审查和 CI/CD 流水线管理。

Bitbucket REST API 是 Bitbucket 提供的 HTTP 接口,允许通过程序化方式访问和操作仓库,例如获取仓库信息、提交记录、分支、Pull Request 等。它常用于自动化管理项目,如创建分支、触发构建或获取报表,可以配合 Python、RPA 等工具实现自动化任务。

bitbucketRestApi使用方法

​ 官方文档REST Resources Provided By: Bitbucket Server - REST

​ 官方文档非常详细,各类功能均有案例说明。

包括但不限于:

  1. 将 Bitbucket 与其他应用程序集成;
  2. 创建与 Bitbucket 交互的脚本;或
  3. 开发增强 Bitbucket UI 的插件,使用 REST 与后端交互。
  4. 获取各类信息

​ 这里基于案例:

  1. 获取所有项目

  2. 以及每个项目下的仓库

  3. 每个仓库下近一年、二年、三年的提交人数

    进行介绍使用方法,难度由简单到复杂。

    最终结果如下h

image-20250828161841810

1. 案例操作步骤

1.1 获取所有项目信息

​ 可以通过curl或者python等等方式获取api信息。

1
curl -u username:password http://IP:port/rest/api/1.0/projects?limit=1000

​ 通过观察可以发现每个项目有唯一的key。之后可以通过key查询项目下的仓库。

image-20250828161502260

1.2 获取单个项目下的信息

1.2.1 获取单个项目下的管理员信息

通过刚刚的方法获取到了key之后就可以通过key查询项目信息。运行python文件getSingleProjectAdmin.py即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import requests

BASE_URL = "http://IP:port"
AUTH = ("username", "password")

def get_project_admins(project_key):
try:
resp = requests.get(f"{BASE_URL}/rest/api/1.0/projects/{project_key}/permissions/users?limit=1000", auth=AUTH)# 重点
resp.raise_for_status()
users = resp.json().get('values', [])
admins = [u['user']['displayName'] for u in users if u['permission'] == 'PROJECT_ADMIN']
return admins
except Exception as e:
print(f"Error getting admins for project {project_key}: {e}")
return []

if __name__ == "__main__":
key="project key"
admins = get_project_admins(key)
print(f"Project {key} admins: {admins}")

image-20250828162323096

1.2.2 获取单个项目下的代码仓库信息

以类似的方法获取。运行python文件 getSingleProjectRepos.py即可获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import requests

BASE_URL = "http://IP:port"
AUTH = ("username", "password")

def get_project_repos(project_key):
try:
resp = requests.get(f"{BASE_URL}/rest/api/1.0/projects/{project_key}/repos?limit=1000", auth=AUTH)# 重点
resp.raise_for_status()
repos = resp.json().get('values', [])
result = []
for r in repos:
result.append({
"name": r.get('name', 'N/A'),
"slug": r.get('slug', 'N/A'),
"description": r.get('description', '')
})
return result
except Exception as e:
print(f"Error getting repos for project {project_key}: {e}")
return []

if __name__ == "__main__":
key = "project key"

repos = get_project_repos(key)
print(f"Project {key} repos: {repos}")


image-20250828162812005

1.3 获取单个仓库的提交信息

可以通过项目key和repo的slug查询到仓库提交信息,进而判断每一个仓库的活跃度。并且支持细粒度的时间范围查询。运行python文件 getSingleProjectRepos.py即可实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import requests
import base64
from datetime import datetime
from collections import defaultdict

BASE_URL = "http://IP:port"
USERNAME = "username"
PASSWORD = "password"

def get_auth_header(user, pwd):
auth_str = f"{user}:{pwd}"
auth_bytes = auth_str.encode("utf-8")
base64_bytes = base64.b64encode(auth_bytes)
base64_str = base64_bytes.decode("ascii")
return {"Authorization": f"Basic {base64_str}"}

HEADERS = get_auth_header(USERNAME, PASSWORD)

# 获取项目下的所有仓库
def get_project_repos(project_key):
url = f"{BASE_URL}/rest/api/1.0/projects/{project_key}/repos?limit=1000"
resp = requests.get(url, headers=HEADERS)
resp.raise_for_status()
return resp.json().get("values", [])

# 获取指定仓库指定时间段内的提交记录(支持分页)
def get_commits_in_range(project_key, repo_slug, start_date, end_date):
commits = []
start = 0
is_last_page = False
while not is_last_page:
url = f"{BASE_URL}/rest/api/1.0/projects/{project_key}/repos/{repo_slug}/commits?limit=1000&start={start}"
resp = requests.get(url, headers=HEADERS)
resp.raise_for_status()
data = resp.json()
for c in data.get("values", []):
author = c.get("author", {}).get("name", "unknown")
date_ts = c.get("authorTimestamp") # 毫秒时间戳
commit_date = datetime.utcfromtimestamp(date_ts / 1000)
if start_date <= commit_date <= end_date:
commits.append({"author": author, "date": commit_date})
is_last_page = data.get("isLastPage", True)
start = data.get("nextPageStart", 0)
return commits

# 统计开发者活跃度
def get_dev_activity(commits):
activity = defaultdict(int)
for c in commits:
activity[c["author"]] += 1
return dict(activity)

def get_project_activity_fixed(project_key):
"""
查询指定项目在固定时间段内的所有仓库提交信息和开发者活跃度。

参数:
project_key (str): 项目 key

返回:
List[Dict]: 每个仓库的提交情况
"""
# 固定时间段(内部设置)
start_date = datetime(2020, 1, 1)
end_date = datetime(2025, 8, 28)

repos = get_project_repos(project_key)
results = []

for r in repos:
commits = get_commits_in_range(project_key, r["slug"], start_date, end_date)
activity = get_dev_activity(commits)
results.append({
"repo_name": r["name"],
"repo_slug": r["slug"],
"start_date": start_date.date(),
"end_date": end_date.date(),
"activity": activity
})

return results


if __name__ == "__main__":
project_key = "project key"
activity_data = get_project_activity_fixed(project_key)
print(activity_data)

image-20250828163812265

2. 最终结果

整合以上所有代码即可获取最终结果。

2.1 获取所有项目下的基本信息

包括:ID Project Key Project Name Repo Name Repo Slug Description Admins。运行python文件getAllProjectsAndRepos.py即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
import csv
import requests

# -----------------------------
# 环境变量读取用户名密码
# -----------------------------
USERNAME = "username"
PASSWORD = "password"
AUTH = (USERNAME, PASSWORD)
BASE_URL = "http://IP:port"

# -----------------------------
# 获取项目管理员
# -----------------------------
def get_all_projects():
try:
resp = requests.get(f"{BASE_URL}/rest/api/1.0/projects?limit=1000", auth=AUTH)
resp.raise_for_status()
projects = resp.json().get('values', [])
return projects
except Exception as e:
print(f"Error getting projects: {e}")
return []

# -----------------------------
# 获取项目管理员
# -----------------------------
def get_project_admins(project_key):
try:
resp = requests.get(
f"{BASE_URL}/rest/api/1.0/projects/{project_key}/permissions/users?limit=1000",
auth=AUTH
)
resp.raise_for_status()
users = resp.json().get('values', [])
admins = [u['user']['displayName'] for u in users if u['permission'] == 'PROJECT_ADMIN']
return admins
except Exception as e:
print(f"Error getting admins for project {project_key}: {e}")
return []

# -----------------------------
# 获取项目仓库信息
# -----------------------------
def get_project_repos(project_key):
try:
resp = requests.get(
f"{BASE_URL}/rest/api/1.0/projects/{project_key}/repos?limit=1000",
auth=AUTH
)
resp.raise_for_status()
repos = resp.json().get('values', [])
result = []
for r in repos:
result.append({
"name": r.get('name', 'N/A'),
"slug": r.get('slug', 'N/A'),
"description": r.get('description', ''),
})
return result
except Exception as e:
print(f"Error getting repos for project {project_key}: {e}")
return []

# -----------------------------
# 主程序,生成 CSV
# -----------------------------
if __name__ == "__main__":
all_projects = get_all_projects()
csv_file = "all_projects_repos.csv"
with open(csv_file, mode="w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["ID", "Project Key", "Project Name", "Repo Name", "Repo Slug", "Description", "Admins"])

idx = 1
for p in all_projects:
project_key = p.get("key")
project_name = p.get("name")
admins = get_project_admins(project_key)
repos = get_project_repos(project_key)
if not repos:
# 如果没有仓库,也写一行
writer.writerow([idx, project_key, project_name, "", "", "", ";".join(admins)])
idx += 1
else:
for r in repos:
writer.writerow([idx, project_key, project_name, r["name"], r["slug"], r["description"], ";".join(admins)])
idx += 1

print(f"CSV file generated: {csv_file}")

image-20250828163612594

2.2 获取所有项目所有仓库多年的提交信息

​ 利用上一步获取到的excel文件结合RPA技术即可实现最终效果。也可以使用pytohn的excel模块,但是出问题概率高。

首先复制上一步所有的csv内容到excel表格。得到以下表格的A到G列。

2.2.1 获取基本信息

image-20250828165332470

2.2.2 RPA操作

2.2.2.1准备影刀RPA

​ 下载影刀RPA下载影刀客户端-影刀RPA - 影刀官网

2.2.2.2 创建RPA项目image-20250829091254852

2.2.2.1 编写RPA模块

接下来使用RPA。编写获取提交信息的模块,也是基于python。

image-20250829091332147

模块源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from datetime import datetime
from collections import defaultdict
import requests
import base64

BASE_URL = "http://IP:port"
USERNAME = "username"
PASSWORD = "password"

def get_auth_header(user, pwd):
auth_str = f"{user}:{pwd}"
auth_bytes = auth_str.encode("utf-8")
base64_bytes = base64.b64encode(auth_bytes)
base64_str = base64_bytes.decode("ascii")
return {"Authorization": f"Basic {base64_str}"}

HEADERS = get_auth_header(USERNAME, PASSWORD)

def get_project_repos(project_key):
url = f"{BASE_URL}/rest/api/1.0/projects/{project_key}/repos?limit=1000"
resp = requests.get(url, headers=HEADERS)
resp.raise_for_status()
return resp.json().get("values", [])

def get_commits_in_range(project_key, repo_slug, start_date, end_date):
commits = []
start = 0
is_last_page = False
while not is_last_page:
url = f"{BASE_URL}/rest/api/1.0/projects/{project_key}/repos/{repo_slug}/commits?limit=1000&start={start}"
resp = requests.get(url, headers=HEADERS)
resp.raise_for_status()
data = resp.json()
for c in data.get("values", []):
author = c.get("author", {}).get("name", "unknown")
date_ts = c.get("authorTimestamp") # 毫秒时间戳
commit_date = datetime.utcfromtimestamp(date_ts / 1000)
if start_date <= commit_date <= end_date:
commits.append({"author": author, "date": commit_date})
is_last_page = data.get("isLastPage", True)
start = data.get("nextPageStart", 0)
return commits

def get_dev_activity(commits):
activity = defaultdict(int)
for c in commits:
activity[c["author"]] += 1
return dict(activity)

def get_repo_activity(project_key, repo_slug, start_date, end_date):
"""
返回指定项目仓库在指定时间段内的开发者活跃度信息

参数:
project_key (str): 项目 key
repo_slug (str): 仓库 slug
start_date (datetime): 查询开始日期
end_date (datetime): 查询结束日期

返回:
Dict: {
"repo_name": 仓库名,
"repo_slug": 仓库 slug,
"start_date": 日期,
"end_date": 日期,
"activity": {开发者: 提交次数}
}
"""
repos = get_project_repos(project_key)
repo_info = next((r for r in repos if r['slug'] == repo_slug), None)
if not repo_info:
raise ValueError(f"仓库 {repo_slug} 不存在于项目 {project_key}")

commits = get_commits_in_range(project_key, repo_slug, start_date, end_date)
activity = get_dev_activity(commits)

return {
"repo_name": repo_info['name'],
"repo_slug": repo_info['slug'],
"start_date": start_date.date(),
"end_date": end_date.date(),
"activity": activity
}

def get_repo_activity_fixed_1(project_key, repo_slug):
"""
查询指定项目仓库在固定时间段内的开发者活跃度。

参数:
project_key (str): 项目 key
repo_slug (str): 仓库 slug

返回:
Dict: 包含仓库 slug、时间段、开发者提交次数
"""
# 固定时间段
start_date = datetime(2023, 1, 1)
end_date = datetime(2024, 1, 1)

commits = get_commits_in_range(project_key, repo_slug, start_date, end_date)
activity = get_dev_activity(commits)

return {
"repo_slug": repo_slug,
"start_date": start_date.date(),
"end_date": end_date.date(),
"activity": activity
}

def get_repo_activity_fixed_2(project_key, repo_slug):
"""
查询指定项目仓库在固定时间段内的开发者活跃度。

参数:
project_key (str): 项目 key
repo_slug (str): 仓库 slug

返回:
Dict: 包含仓库名、仓库 slug、时间段、开发者提交次数
"""
# 固定时间段
start_date = datetime(2024, 1, 1)
end_date = datetime(2025, 1, 1)

repos = get_project_repos(project_key)
repo_info = next((r for r in repos if r['slug'] == repo_slug), None)
if not repo_info:
raise ValueError(f"仓库 {repo_slug} 不存在于项目 {project_key}")

commits = get_commits_in_range(project_key, repo_slug, start_date, end_date)
activity = get_dev_activity(commits)

return {
"repo_name": repo_info['name'],
"repo_slug": repo_info['slug'],
"start_date": start_date.date(),
"end_date": end_date.date(),
"activity": activity
}
def get_repo_activity_fixed_3(project_key, repo_slug):
"""
查询指定项目仓库在固定时间段内的开发者活跃度。

参数:
project_key (str): 项目 key
repo_slug (str): 仓库 slug

返回:
Dict: 包含仓库名、仓库 slug、时间段、开发者提交次数
"""
# 固定时间段
start_date = datetime(2023, 1, 1)
end_date = datetime(2024, 1, 1)

repos = get_project_repos(project_key)
repo_info = next((r for r in repos if r['slug'] == repo_slug), None)
if not repo_info:
raise ValueError(f"仓库 {repo_slug} 不存在于项目 {project_key}")

commits = get_commits_in_range(project_key, repo_slug, start_date, end_date)
activity = get_dev_activity(commits)

return {
"repo_name": repo_info['name'],
"repo_slug": repo_info['slug'],
"start_date": start_date.date(),
"end_date": end_date.date(),
"activity": activity
}
# 调用示例
if __name__ == "__main__":
project_key = "project key"
repo_slug = "stec-project key-app"
start_date = datetime(2020, 1, 1)
end_date = datetime(2025, 8, 26)

repo_activity = get_repo_activity(project_key, repo_slug, start_date, end_date)
print(f"仓库: {repo_activity['repo_name']} ({repo_activity['repo_slug']})")
print(f"时间段: {repo_activity['start_date']}{repo_activity['end_date']}")
for dev, count in repo_activity['activity'].items():
print(f" {dev}: {count}次")

2.2.2.2 编写RPA

RPA流程主要操作:

image-20250828165658109

其中:魔法指令的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from datetime import date

from typing import *
try:
from xbot.app.logging import trace as print
except:
from xbot import print


def format_activity_output(input_data):
"""
title: 格式化输出活动数据
description: 从输入的字典中提取并格式化输出 % activity % 字段的内容,支持处理包含datetime对象的字典。
inputs:
- input_data (dict): 包含activity字段的字典数据,eg: "{'repo_name': 'stec-project key-app', 'start_date': datetime.date(2020, 1, 1), 'activity': {'zhangjian': 11, 'Jingyue Lou': 1}}"
outputs:
- formatted_activity (str): 格式化后的活动数据,eg: "zhangjian: 11\nJingyue Lou: 1"
"""

# 检查输入是否为字典
if not isinstance(input_data, dict):
raise ValueError("输入数据必须是字典格式")

# 检查是否包含activity字段
if 'activity' not in input_data:
raise ValueError("输入数据中不包含'activity'字段")

# 获取activity数据
activity_data = input_data['activity']

# 检查activity是否为字典
if not isinstance(activity_data, dict):
raise ValueError("activity字段必须是字典格式")

# 格式化输出
formatted_result = ""
for name, count in activity_data.items():
formatted_result += f"{name}: {count}\n"

# 去除最后一个换行符
if formatted_result:
formatted_result = formatted_result.rstrip()

return formatted_result

2.2.3 运行RPA

image-20250828165821017

2.2.4 结束