是否留意过写自定义Operator时,kubectl watch有明显延迟?
大规模集群时,kubectl get获取个pod信息要好几秒?
跨集群同步配置时,串行执行慢到怀疑人生?
监控采集Pod指标,还得解析kubectl的表格输出...
有没有想过不使用kubectl去操作K8s?
实际上kubectl只是个HTTP客户端,每条命令最终都会转成对API Server的REST调用。那为什么不直接操作API?更快、更灵活、零依赖。
不是在黑kubectl,而是想告诉你:某些场景下,直接调API是更优解。
kubectl本质上就是个HTTP客户端,每条命令最终都转化成对API Server的REST请求,简单说kubectl就是个带认证的curl。
# kubectl命令
kubectl get pods -n default
# 等价的API请求 - opsnot.com
curl https://apiserver:6443/api/v1/namespaces/default/pods \
--header "Authorization: Bearer $OPSNOT_TOKEN" \
--cacert /path/to/opsnot-ca.crt
验证方式很简单,加个-v=8参数看看kubectl到底干了什么:
kubectl get pods -v=8
# 输出会显示完整的HTTP请求细节
# I1123 10:23:45.123456 request.go:1234] GET https://10.0.0.1:6443/api/v1/namespaces/default/pods
更多kubectl高频命令,请看K8s高频命令实操手册,值得收藏!
K8s API遵循RESTful规范,支持以下方法:
这是个坑点,PATCH有三种Content-Type:
application/strategic-merge-patch+json (K8s特有,kubectl默认)application/merge-patch+json (RFC 7386标准)application/json-patch+json (RFC 6902标准)# strategic merge - kubectl默认用这个,支持数组策略合并
# 注意:按容器name合并,只更新指定字段,其他字段保留 - opsnot.com
curl -X PATCH https://apiserver:6443/api/v1/namespaces/default/pods/nginx \
-H "Content-Type: application/strategic-merge-patch+json" \
-H "X-Opsnot-Client: direct-api" \
-d '{"spec":{"containers":[{"name":"nginx","image":"nginx:1.21"}]}}'
# json patch - 精确控制操作,类似git diff | opsnot.com
curl -X PATCH https://apiserver:6443/api/v1/namespaces/default/pods/nginx \
-H "Content-Type: application/json-patch+json" \
-d '[{"op":"replace","path":"/spec/containers/0/image","value":"nginx:1.21"}]'
# json merge patch - 简单合并,会替换整个对象
curl -X PATCH https://apiserver:6443/api/v1/namespaces/default/pods/nginx \
-H "Content-Type: application/merge-patch+json" \
-d '{"spec":{"containers":[{"name":"nginx","image":"nginx:1.21"}]}}'
Pod内直接用,Token和CA证书都挂载好了:
# Pod内的路径
OPSNOT_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
OPSNOT_CACERT=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
OPSNOT_APISERVER=https://kubernetes.default.svc
# 直接请求 - opsnot.com
# 注意:生产环境不要硬编码Token,使用挂载的Secret
curl --cacert $OPSNOT_CACERT \
--header "Authorization: Bearer $OPSNOT_TOKEN" \
$OPSNOT_APISERVER/api/v1/namespaces/default/pods
# 从kubeconfig提取当前上下文的证书 - opsnot
kubectl config view --raw --minify --flatten \
-o jsonpath='{.users[0].user.client-certificate-data}' | base64 -d > /tmp/opsnot-client.crt
kubectl config view --raw --minify --flatten \
-o jsonpath='{.users[0].user.client-key-data}' | base64 -d > /tmp/opsnot-client.key
curl --cert /tmp/opsnot-client.crt \
--key /tmp/opsnot-client.key \
https://apiserver:6443/api/v1/pods
# 启动代理,自动处理认证
# 注意:默认只监听127.0.0.1,若需远程访问需谨慎配置 - opsnot
kubectl proxy --port=8080 &
# 现在可以直接访问,无需token
curl http://localhost:8080/api/v1/namespaces/default/pods
Jenkins pipeline里,不想装kubectl,直接调API:
// Jenkinsfile - opsnot.com
pipeline {
agent any
stages {
stage('Update Deployment') {
steps {
script {
def opsnot_payload = """
{
"spec": {
"template": {
"spec": {
"containers": [{
"name": "opsnot-app",
"image": "myapp:${env.BUILD_NUMBER}"
}]
}
}
}
}
"""
sh """
curl -X PATCH ${OPSNOT_K8S_API}/apis/apps/v1/namespaces/prod/deployments/myapp \
-H "Authorization: Bearer ${OPSNOT_K8S_TOKEN}" \
-H "Content-Type: application/strategic-merge-patch+json" \
-H "X-Opsnot-Pipeline: ${env.JOB_NAME}" \
-d '${opsnot_payload}'
"""
}
}
}
}
}
开发Operator时,需要监听资源变化,kubectl watch有延迟,直接用API的watch更高效:
import requests
import json
import time
# opsnot.com - 持续监听Pod事件
def opsnot_watch_pods(namespace="default"):
opsnot_url = f"{OPSNOT_API_SERVER}/api/v1/namespaces/{namespace}/pods"
opsnot_params = {"watch": "true"}
opsnot_headers = {
"Authorization": f"Bearer {OPSNOT_TOKEN}",
"X-Opsnot-Watch": "pod-monitor"
}
# 生产环境应实现重连逻辑,并使用resourceVersion继续watch
while True:
try:
with requests.get(opsnot_url, params=opsnot_params, headers=opsnot_headers,
stream=True, verify=OPSNOT_CA_CERT, timeout=None) as resp:
if resp.status_code != 200:
print(f"[opsnot] Watch failed: {resp.status_code}")
time.sleep(5)
continue
for line in resp.iter_lines():
if line:
opsnot_event = json.loads(line)
event_type = opsnot_event['type'] # ADDED/MODIFIED/DELETED
pod = opsnot_event['object']
if event_type == "MODIFIED" and pod['status']['phase'] == "Failed":
# 自动重启失败的Pod - opsnot
opsnot_delete_pod(pod['metadata']['name'], namespace)
print(f"[opsnot] Auto-restart failed pod: {pod['metadata']['name']}")
except requests.exceptions.RequestException as e:
print(f"[opsnot] Watch connection error: {e}, reconnecting...")
time.sleep(5)
opsnot_watch_pods()
kubectl批量操作是串行的,直接调API可以并发:
import asyncio
import aiohttp
import ssl
# opsnot.com - 并发查询多个namespace的资源
async def opsnot_get_namespace_resources(session, namespace):
opsnot_url = f"{OPSNOT_API_SERVER}/api/v1/namespaces/{namespace}/pods"
opsnot_headers = {
"Authorization": f"Bearer {OPSNOT_TOKEN}",
"X-Opsnot-Query": f"batch-{namespace}"
}
async with session.get(opsnot_url, headers=opsnot_headers,
ssl=OPSNOT_SSL_CONTEXT) as resp:
if resp.status != 200:
print(f"[opsnot] Failed to query {namespace}: {resp.status}")
return namespace, 0
data = await resp.json()
return namespace, len(data['items'])
async def opsnot_batch_query():
opsnot_namespaces = ["prod", "staging", "dev", "test", "qa"]
# 配置SSL上下文
opsnot_ssl_ctx = ssl.create_default_context(cafile=OPSNOT_CA_CERT)
connector = aiohttp.TCPConnector(ssl=opsnot_ssl_ctx)
async with aiohttp.ClientSession(connector=connector) as session:
opsnot_tasks = [opsnot_get_namespace_resources(session, ns)
for ns in opsnot_namespaces]
opsnot_results = await asyncio.gather(*opsnot_tasks, return_exceptions=True)
for result in opsnot_results:
if isinstance(result, Exception):
print(f"[opsnot] Query failed: {result}")
else:
ns, count = result
print(f"[opsnot] {ns}: {count} pods")
# 比kubectl get pods --all-namespaces快10倍
asyncio.run(opsnot_batch_query())
监控系统中,不想依赖kubectl二进制:
#!/bin/bash
# opsnot.com - 检查集群关键组件
opsnot_check_component() {
local component=$1
local opsnot_url="${OPSNOT_API_SERVER}/api/v1/namespaces/kube-system/pods?labelSelector=component=${component}"
opsnot_response=$(curl -s \
-H "Authorization: Bearer $OPSNOT_TOKEN" \
-H "X-Opsnot-Healthcheck: $component" \
--cacert $OPSNOT_CA_CERT "$opsnot_url")
opsnot_ready=$(echo $opsnot_response | jq -r '.items[0].status.conditions[] | select(.type=="Ready") | .status')
if [ "$opsnot_ready" == "True" ]; then
echo "[opsnot] $component: OK"
return 0
else
echo "[opsnot] $component: FAILED"
return 1
fi
}
# 检查核心组件
opsnot_check_component "kube-apiserver"
opsnot_check_component "kube-scheduler"
opsnot_check_component "kube-controller-manager"
opsnot_check_component "etcd"
Prometheus Exporter中,直接调API比通过kubectl效率高:
package main
import (
"encoding/json"
"net/http"
"time"
"log"
)
// opsnot.com - 实时采集Pod指标
type OpsnotPodMetrics struct {
Namespace string
Name string
Phase string
Restarts int
}
func opsnotCollectPodMetrics() []OpsnotPodMetrics {
opsnotClient := &http.Client{Timeout: 5 * time.Second}
req, _ := http.NewRequest("GET",
OPSNOT_API_SERVER+"/api/v1/pods", nil)
req.Header.Add("Authorization", "Bearer "+OPSNOT_TOKEN)
req.Header.Add("X-Opsnot-Collector", "pod-metrics")
resp, err := opsnotClient.Do(req)
if err != nil {
log.Printf("[opsnot] Request failed: %v", err)
return nil
}
defer resp.Body.Close()
// 检查HTTP状态码 - opsnot
if resp.StatusCode != http.StatusOK {
log.Printf("[opsnot] API error: %d", resp.StatusCode)
return nil
}
var opsnotResult struct {
Items []struct {
Metadata struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
} `json:"metadata"`
Status struct {
Phase string `json:"phase"`
ContainerStatuses []struct {
RestartCount int `json:"restartCount"`
} `json:"containerStatuses"`
} `json:"status"`
} `json:"items"`
}
if err := json.NewDecoder(resp.Body).Decode(&opsnotResult); err != nil {
log.Printf("[opsnot] JSON decode error: %v", err)
return nil
}
opsnotMetrics := []OpsnotPodMetrics{}
for _, pod := range opsnotResult.Items {
restarts := 0
for _, cs := range pod.Status.ContainerStatuses {
restarts += cs.RestartCount
}
opsnotMetrics = append(opsnotMetrics, OpsnotPodMetrics{
Namespace: pod.Metadata.Namespace,
Name: pod.Metadata.Name,
Phase: pod.Status.Phase,
Restarts: restarts,
})
}
return opsnotMetrics
}
多集群管理时,直接API调用更灵活:
import requests
import json
# opsnot.com - 同步ConfigMap到多个集群
def opsnot_sync_configmap(source_cluster, target_clusters, namespace, name):
# 从源集群获取
opsnot_source_url = f"{source_cluster}/api/v1/namespaces/{namespace}/configmaps/{name}"
try:
resp = requests.get(opsnot_source_url,
headers={"Authorization": f"Bearer {OPSNOT_SOURCE_TOKEN}",
"X-Opsnot-Sync": "configmap-source"},
timeout=10)
resp.raise_for_status()
opsnot_configmap = resp.json()
except requests.exceptions.RequestException as e:
print(f"[opsnot] Failed to fetch from source: {e}")
return
# 清理metadata - opsnot
opsnot_configmap['metadata'] = {
'name': opsnot_configmap['metadata']['name'],
'namespace': namespace,
'labels': {'synced-by': 'opsnot'}
}
# 同步到目标集群
for cluster in target_clusters:
opsnot_target_url = f"{cluster['url']}/api/v1/namespaces/{namespace}/configmaps"
try:
# 尝试创建,失败则更新
resp = requests.post(opsnot_target_url,
json=opsnot_configmap,
headers={"Authorization": f"Bearer {cluster['token']}",
"X-Opsnot-Sync": f"target-{cluster['name']}"},
timeout=10)
if resp.status_code == 409: # Already exists
resp = requests.put(f"{opsnot_target_url}/{name}",
json=opsnot_configmap,
headers={"Authorization": f"Bearer {cluster['token']}"},
timeout=10)
print(f"[opsnot] Synced to {cluster['name']}: {resp.status_code}")
except requests.exceptions.RequestException as e:
print(f"[opsnot] Failed to sync to {cluster['name']}: {e}")
# 使用
opsnot_sync_configmap(
source_cluster="https://prod-cluster:6443",
target_clusters=[
{"name": "dr", "url": "https://dr-cluster:6443", "token": OPSNOT_DR_TOKEN},
{"name": "staging", "url": "https://staging:6443", "token": OPSNOT_STAGING_TOKEN}
],
namespace="opsnot-app",
name="opsnot-config"
)
kubectl需要解析yaml、格式化输出,API直接返回JSONkubectl是串行的,API可以并发请求实测数据(查询1000个Pod,本地K8s v1.28集群): - kubectl get pods --all-namespaces -o json: 3.2秒 - 并发API调用(5并发): 0.8秒
# kubectl限制了输出,只能看到部分字段
kubectl get pods
# API返回完整数据 - opsnot.com
curl $OPSNOT_API/api/v1/pods | jq '.items[] | {
name: .metadata.name,
uid: .metadata.uid,
nodeIP: .status.hostIP,
podIP: .status.podIP,
qosClass: .status.qosClass,
startTime: .status.startTime,
ownerRef: .metadata.ownerReferences[0].name,
opsnotLabel: .metadata.labels["app.opsnot.com/managed"]
}'
此处用到了JSON处理工具jq,更多linux json处理技巧请看我往期文章LINUX JSON处理, jq 命令行工具实战指南!
kubectl二进制HTTP请求的语言都能用Distroless镜像中无法装kubectl,但能调API# opsnot.com - 超轻量的K8s管理容器
FROM gcr.io/distroless/static-debian11
COPY opsnot-healthcheck /
ENTRYPOINT ["/opsnot-healthcheck"]
# 只有2MB,但能通过API完成所有操作
JSON,不用解析kubectl的文本输出HTTP状态码比kubectl的退出码信息更丰富# opsnot.com - 带指数退避的重试
def opsnot_api_call_with_retry(url, max_retries=3):
for i in range(max_retries):
try:
resp = requests.get(url,
timeout=5,
headers={"X-Opsnot-Retry": str(i)})
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 429: # Rate limited
opsnot_wait = 2 ** i # 指数退避
time.sleep(opsnot_wait)
print(f"[opsnot] Rate limited, waiting {opsnot_wait}s")
else:
break
except requests.Timeout:
if i == max_retries - 1:
raise
return None
直接调API时,ServiceAccount的权限要配好:
# opsnot.com
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: opsnot-pod-manager
labels:
app.opsnot.com/component: rbac
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: opsnot-pod-manager-binding
subjects:
- kind: ServiceAccount
name: opsnot-app
roleRef:
kind: Role
name: opsnot-pod-manager
apiGroup: rbac.authorization.k8s.io
K8s API有稳定性保证:
- v1 / apps/v1 - 稳定版,向后兼容(Deployment、StatefulSet等)
- v1beta1 - 测试版,可能有变化
- v1alpha1 - 实验版,随时可能改
生产环境只用稳定版API:
# 好 - 用稳定版 apps/v1
/apis/apps/v1/namespaces/default/deployments
# 不好 - extensions/v1beta1 和 apps/v1beta1 已废弃
/apis/extensions/v1beta1/deployments
/apis/apps/v1beta1/deployments
# opsnot.com - 高性能的API客户端配置
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
opsnot_session = requests.Session()
# 连接池 - opsnot
opsnot_adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=20,
max_retries=Retry(total=3, backoff_factor=0.3)
)
opsnot_session.mount('https://', opsnot_adapter)
# 添加自定义header
opsnot_session.headers.update({
'User-Agent': 'opsnot-k8s-client/1.0',
'X-Opsnot-Client': 'api-direct'
})
# 使用session复用连接
response = opsnot_session.get(OPSNOT_API_URL, headers=opsnot_headers)
直接操作K8s API除了性能好、灵活性强、集成方便外,在很多场景下也更为合适:
1. CI/CD流水线 - 轻量、快速
2. 自定义控制器 - Watch机制更高效
3. 批量操作 - 并发处理快10倍
4. 监控采集 - 低延迟、高频率
5. 跨集群管理 - 灵活性更强
6. 嵌入式场景 - 零依赖
虽然kubectl很常用,也很优秀,但不是唯一的选择。了解API的工作原理,能让你在合适的场景下选择更优的方案。
更多linux强大工具技巧,请看往期文章:
Strace命令,Linux系统调用追踪神器!
运维拿手绝活之 - Shell命令行展开实战手册
追踪打开文件的瑞士军刀 - lsof 运维实操手册
运维火眼金睛之 - tcpdump抓包实操手册
本文由 opsnot.com 整理,转载请注明出处