Go:client-go
目录:
初始化Client
添加数据
参考
Create(ctx context.Context, endpoints *v1.Endpoints, opts metav1.CreateOptions) (*v1.Endpoints, error)
详细代码
job := &batch_v1.Job{}
job.Name = contaner_name
job.Spec.Completions = &completion
// 对于这种需要单独实例化
container := core_v1.Container{
Name: contaner_name,
Image: image,
Args: []string{hostname, idc, ip},
ImagePullPolicy: "Always",
}
job.Spec.Template.Spec.Containers = append(job.Spec.Template.Spec.Containers, container)
job.Spec.Template.Spec.RestartPolicy = "OnFailure"
_, err := kubectl.BatchV1().Jobs("cronjob").Create(context.TODO(), job, meta_v1.CreateOptions{})
if err != nil {
glog.Errorf("Node expansion create job ns: cronjob, name: %s, err: %s", contaner_name, err.Error())
} else {
glog.Infof("Node expansion create job ns: cronjob, name: %s", contaner_name)
}
获取数据
获取数据分Get和List
Get
参考
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Endpoints, error)
详细代码
svc, err := kubeclient.CoreV1().Services(namespace).Get(context.TODO(), servicename, meta_v1.GetOptions{})
if err != nil {
glog.Error(fmt.Sprintf("获取集群的svc失败,svc: %s, err: %s", servicename, err.Error()))
} else {
glog.Info(fmt.Sprintf("获取集群的svc成功,svc: %s", servicename))
}
List
参考
List(ctx context.Context, opts metav1.ListOptions) (*v1.EndpointsList, error)
meta_v1.GetOptions{}可以添加查询的条件
详细代码
nodelist, err := toClient.CoreV1().Nodes().List(context.TODO(),
meta_v1.ListOptions{
LabelSelector: "node-role.kubernetes.io/ingress=true",
},
)
更新数据
这里有Patch和Update之分
Update
更新需要在原来的数据基础之上
详细代码
// 删除一个svc的selector
delete(svc.Spec.Selector, "env")
// 清空
// svc.Spec.Selector = make(map[string]string)
_, err = toClient.CoreV1().Services(namespace).Update(context.TODO(), svc, meta_v1.UpdateOptions{})
if err != nil {
fmt.Println(fmt.Sprintf("更新svc失败,svc: %s, err: %s", svc, err.Error()))
os.Exit(1)
} else {
fmt.Println(fmt.Sprintf("更新svc成功,svc: %s", svc))
}
Patch
补丁直接提交需要修改的数据,所以对于需要删除一些数据的功能是无法实现的
详细代码
tsSelector = make(map[string]string)
tsSelector["env"] = "online"
tsPatchData := map[string]interface{}{
"spec": map[string]interface{}{
"selector": tsSelector,
},
}
tsPausePatchBytes, _ := json.Marshal(tsPausePatchData)
_, err = toClient.CoreV1().Services(namespace).Patch(context.TODO(), pauseService, types.MergePatchType, tsPausePatchBytes, meta_v1.PatchOptions{})
也可以直接写字符串的json数据,通过[]byte()转换成bytes
删除数据
参考
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
describe
pod的event
代码摘自kubectl的describe pod的event代码
func GetPodDescribeByPodName(ctx context.Context, namespace string, podname string, cluster string) (model.PodDescribes, error) {
client, exists := r.ClientMap[cluster]
if !exists {
return nil, errors.Errorf("kube client not found for %s", cluster)
}
pod, err := client.CoreV1().Pods(namespace).Get(ctx, podname, metav1.GetOptions{})
if err != nil {
return nil, err
}
var events *apiv1.EventList
if ref, err := reference.GetReference(scheme.Scheme, pod); err != nil {
return nil, errors.New(fmt.Sprintf("Unable to construct reference to '%#v': %v", pod, err))
} else {
ref.Kind = ""
if _, isMirrorPod := pod.Annotations[apiv1.MirrorPodAnnotationKey]; isMirrorPod {
ref.UID = types.UID(pod.Annotations[apiv1.MirrorPodAnnotationKey])
}
events, err = client.CoreV1().Events(namespace).Search(scheme.Scheme, ref)
if err != nil {
return nil, err
}
}
describeList := []model.DescribeMessage{}
for _, e := range events.Items {
var interval string
if e.Count > 1 {
interval = fmt.Sprintf("%s (x%d over %s)", translateTimestampSince(e.LastTimestamp), e.Count, translateTimestampSince(e.FirstTimestamp))
} else {
interval = translateTimestampSince(e.FirstTimestamp)
}
describeList = append(describeList, model.DescribeMessage{
Type: e.Type,
Reason: e.Reason,
Age: interval,
From: formatEventSource(e.Source),
Message: strings.TrimSpace(e.Message),
})
// fmt.Sprintf("%v\t%v\t%s\t%v\t%v\n", e.Type, e.Reason, interval, formatEventSource(e.Source), strings.TrimSpace(e.Message)))
}
return describeList, nil
node的资源分配情况
代码摘自kubectl的describe node的代码
if strings.Contains(node.Name, "master") {
continue
}
fieldSelector, err := fields.ParseSelector("spec.nodeName=" + node.Name + ",status.phase!=" + string(core_v1.PodSucceeded) + ",status.phase!=" + string(core_v1.PodFailed))
if err != nil {
return err
}
nodeNonTerminatedPodsList, err := kubectl.CoreV1().Pods(meta_v1.NamespaceAll).List(context.TODO(), meta_v1.ListOptions{FieldSelector: fieldSelector.String()})
allocatable := node.Status.Capacity
if len(node.Status.Allocatable) > 0 {
allocatable = node.Status.Allocatable
}
//reqs, limits := getPodsTotalRequestsAndLimits(nodeNonTerminatedPodsList)
reqs, _ := getPodsTotalRequestsAndLimits(nodeNonTerminatedPodsList)
//cpuReqs, cpuLimits, memoryReqs, memoryLimits := reqs[core_v1.ResourceCPU], limits[core_v1.ResourceCPU], reqs[core_v1.ResourceMemory], limits[core_v1.ResourceMemory]
cpuReqs, memoryReqs := reqs[core_v1.ResourceCPU], reqs[core_v1.ResourceMemory]
// CPU
cpuallocatable := allocatable.Cpu().MilliValue()
if cpuallocatable != 0 {
CpuAllocatable(node.Name, cpuallocatable)
CpuRequest(node.Name, cpuReqs.MilliValue())
// fractionCpuLimits = float64(cpuLimits.MilliValue()) / float64(allocatable.Cpu().MilliValue()) * 100
} else {
errMsg = append(errMsg, fmt.Sprintf("%s cpuallocatable = 0", node.Name))
}
// MEM
memallocatable := allocatable.Memory().Value()
if memallocatable != 0 {
MemAllocatable(node.Name, memallocatable)
MemRequest(node.Name, memoryReqs.Value())
// fractionMemoryLimits = float64(memoryLimits.Value()) / float64(allocatable.Memory().Value()) * 100
} else {
errMsg = append(errMsg, fmt.Sprintf("%s memallocatable = 0", node.Name))
}
log
代码摘自kubectl log
func PodLogsByPodName(ctx context.Context, namespace string, podname string, cluster string) (podlog string, err error) {
client, exists := r.ClientMap[cluster]
if !exists {
err = errors.Errorf("kube client not found for %s", cluster)
return
}
// -n 200
var lines int64 = 200
podlogOptions := &apiv1.PodLogOptions{
TailLines: &lines,
}
restclient_request := client.CoreV1().Pods(namespace).GetLogs(podname, podlogOptions)
podLogs, err := restclient_request.Stream(ctx)
if err != nil {
return
}
defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return
}
podlog = buf.String()
return
}
pod重启
原理使用了exec执行kill
func RestartPodByPodName(ctx context.Context, namespace string, podname string, cluster string) (err error) {
var stdin, stdout, stderr bytes.Buffer
client, exists := r.ClientMap[cluster]
if !exists {
return errors.Errorf("kube client not found for %s", cluster)
}
restclient := r.RestclientMap[cluster]
pod, err := client.CoreV1().Pods(namespace).Get(ctx, podname, metav1.GetOptions{})
if err != nil {
return err
}
//restart := pod.Status.ContainerStatuses[0].RestartCount
containerName := pod.Spec.Containers[0].Name
command := []string{"kill", "1"}
req := client.CoreV1().RESTClient().Post().Resource("pods").Name(podname).Namespace(namespace).SubResource("exec").VersionedParams(
&apiv1.PodExecOptions{
Container: containerName,
Command: command,
Stdout: true,
Stderr: true,
Stdin: true,
TTY: true,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(restclient, "POST", req.URL())
if err != nil {
return err
}
err = exec.Stream(
remotecommand.StreamOptions{
Stdin: &stdin,
Stderr: &stderr,
Stdout: &stdout,
Tty: true,
},
)
//pod, err := client.CoreV1().Pods(namespace).Get(ctx, podname, metav1.GetOptions{})
//if err != nil {
// return err
//}
return err
}
informer
参考https://blog.whysdomain.com/blog/333/
其他
获取Node对应的IP地址
func node2ip(n *core_v1.Node) (string, error) {
for _, i := range n.Status.Addresses {
if i.Type == "InternalIP" {
return i.Address, nil
}
}
return "", errors.New("没有获取到InternalIP")
}