Go:client-go

时间:April 11, 2021 分类:

目录:

初始化Client

添加数据

参考

Create(ctx context.Context, endpoints *v1.Endpoints, opts metav1.CreateOptions) (*v1.Endpoints, error)

详细代码

    job := &batch_v1.Job{}
    job.Name = contaner_name
    job.Spec.Completions = &completion
    // 对于这种需要单独实例化
    container := core_v1.Container{
        Name:            contaner_name,
        Image:           image,
        Args:            []string{hostname, idc, ip},
        ImagePullPolicy: "Always",
    }
    job.Spec.Template.Spec.Containers = append(job.Spec.Template.Spec.Containers, container)
    job.Spec.Template.Spec.RestartPolicy = "OnFailure"
    _, err := kubectl.BatchV1().Jobs("cronjob").Create(context.TODO(), job, meta_v1.CreateOptions{})
    if err != nil {
        glog.Errorf("Node expansion create job ns: cronjob, name: %s, err: %s", contaner_name, err.Error())
    } else {
        glog.Infof("Node expansion create job ns: cronjob, name: %s", contaner_name)
    }

获取数据

获取数据分Get和List

Get

参考

Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Endpoints, error)

详细代码

    svc, err := kubeclient.CoreV1().Services(namespace).Get(context.TODO(), servicename, meta_v1.GetOptions{})
    if err != nil {
        glog.Error(fmt.Sprintf("获取集群的svc失败,svc: %s, err: %s", servicename, err.Error()))
    } else {
        glog.Info(fmt.Sprintf("获取集群的svc成功,svc: %s", servicename))
    }

List

参考

List(ctx context.Context, opts metav1.ListOptions) (*v1.EndpointsList, error)

meta_v1.GetOptions{}可以添加查询的条件

详细代码

nodelist, err := toClient.CoreV1().Nodes().List(context.TODO(),
        meta_v1.ListOptions{
            LabelSelector: "node-role.kubernetes.io/ingress=true",
        },
    )

更新数据

这里有Patch和Update之分

Update

更新需要在原来的数据基础之上

详细代码

    // 删除一个svc的selector
    delete(svc.Spec.Selector, "env")
    // 清空
    // svc.Spec.Selector = make(map[string]string)
    _, err = toClient.CoreV1().Services(namespace).Update(context.TODO(), svc, meta_v1.UpdateOptions{})
    if err != nil {
        fmt.Println(fmt.Sprintf("更新svc失败,svc: %s, err: %s", svc, err.Error()))
        os.Exit(1)
    } else {
        fmt.Println(fmt.Sprintf("更新svc成功,svc: %s", svc))
    }

Patch

补丁直接提交需要修改的数据,所以对于需要删除一些数据的功能是无法实现的

详细代码

    tsSelector = make(map[string]string)
    tsSelector["env"] = "online"
    tsPatchData := map[string]interface{}{
        "spec": map[string]interface{}{
            "selector": tsSelector,
        },
    }
    tsPausePatchBytes, _ := json.Marshal(tsPausePatchData)
    _, err = toClient.CoreV1().Services(namespace).Patch(context.TODO(), pauseService, types.MergePatchType, tsPausePatchBytes, meta_v1.PatchOptions{})

也可以直接写字符串的json数据,通过[]byte()转换成bytes

删除数据

参考

Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error

describe

pod的event

代码摘自kubectl的describe pod的event代码

func GetPodDescribeByPodName(ctx context.Context, namespace string, podname string, cluster string) (model.PodDescribes, error) {
    client, exists := r.ClientMap[cluster]
    if !exists {
        return nil, errors.Errorf("kube client not found for %s", cluster)
    }
    pod, err := client.CoreV1().Pods(namespace).Get(ctx, podname, metav1.GetOptions{})
    if err != nil {
        return nil, err
    }
    var events *apiv1.EventList
    if ref, err := reference.GetReference(scheme.Scheme, pod); err != nil {
        return nil, errors.New(fmt.Sprintf("Unable to construct reference to '%#v': %v", pod, err))
    } else {
        ref.Kind = ""
        if _, isMirrorPod := pod.Annotations[apiv1.MirrorPodAnnotationKey]; isMirrorPod {
            ref.UID = types.UID(pod.Annotations[apiv1.MirrorPodAnnotationKey])
        }
        events, err = client.CoreV1().Events(namespace).Search(scheme.Scheme, ref)
        if err != nil {
            return nil, err
        }
    }
    describeList := []model.DescribeMessage{}
    for _, e := range events.Items {
        var interval string
        if e.Count > 1 {
            interval = fmt.Sprintf("%s (x%d over %s)", translateTimestampSince(e.LastTimestamp), e.Count, translateTimestampSince(e.FirstTimestamp))
        } else {
            interval = translateTimestampSince(e.FirstTimestamp)
        }
        describeList = append(describeList, model.DescribeMessage{
            Type:    e.Type,
            Reason:  e.Reason,
            Age:     interval,
            From:    formatEventSource(e.Source),
            Message: strings.TrimSpace(e.Message),
        })
        // fmt.Sprintf("%v\t%v\t%s\t%v\t%v\n", e.Type, e.Reason, interval, formatEventSource(e.Source), strings.TrimSpace(e.Message)))
    }
    return describeList, nil

node的资源分配情况

代码摘自kubectl的describe node的代码

        if strings.Contains(node.Name, "master") {
            continue
        }
        fieldSelector, err := fields.ParseSelector("spec.nodeName=" + node.Name + ",status.phase!=" + string(core_v1.PodSucceeded) + ",status.phase!=" + string(core_v1.PodFailed))
        if err != nil {
            return err
        }
        nodeNonTerminatedPodsList, err := kubectl.CoreV1().Pods(meta_v1.NamespaceAll).List(context.TODO(), meta_v1.ListOptions{FieldSelector: fieldSelector.String()})
        allocatable := node.Status.Capacity
        if len(node.Status.Allocatable) > 0 {
            allocatable = node.Status.Allocatable
        }
        //reqs, limits := getPodsTotalRequestsAndLimits(nodeNonTerminatedPodsList)
        reqs, _ := getPodsTotalRequestsAndLimits(nodeNonTerminatedPodsList)
        //cpuReqs, cpuLimits, memoryReqs, memoryLimits := reqs[core_v1.ResourceCPU], limits[core_v1.ResourceCPU], reqs[core_v1.ResourceMemory], limits[core_v1.ResourceMemory]
        cpuReqs, memoryReqs := reqs[core_v1.ResourceCPU], reqs[core_v1.ResourceMemory]
        // CPU
        cpuallocatable := allocatable.Cpu().MilliValue()
        if cpuallocatable != 0 {
            CpuAllocatable(node.Name, cpuallocatable)
            CpuRequest(node.Name, cpuReqs.MilliValue())
            // fractionCpuLimits = float64(cpuLimits.MilliValue()) / float64(allocatable.Cpu().MilliValue()) * 100
        } else {
            errMsg = append(errMsg, fmt.Sprintf("%s cpuallocatable = 0", node.Name))
        }
        // MEM
        memallocatable := allocatable.Memory().Value()
        if memallocatable != 0 {
            MemAllocatable(node.Name, memallocatable)
            MemRequest(node.Name, memoryReqs.Value())
            // fractionMemoryLimits = float64(memoryLimits.Value()) / float64(allocatable.Memory().Value()) * 100
        } else {
            errMsg = append(errMsg, fmt.Sprintf("%s memallocatable = 0", node.Name))
        }

log

代码摘自kubectl log

func PodLogsByPodName(ctx context.Context, namespace string, podname string, cluster string) (podlog string, err error) {
    client, exists := r.ClientMap[cluster]
    if !exists {
        err = errors.Errorf("kube client not found for %s", cluster)
        return
    }

    // -n 200
    var lines int64 = 200
    podlogOptions := &apiv1.PodLogOptions{
        TailLines: &lines,
    }
    restclient_request := client.CoreV1().Pods(namespace).GetLogs(podname, podlogOptions)
    podLogs, err := restclient_request.Stream(ctx)
    if err != nil {
        return
    }
    defer podLogs.Close()
    buf := new(bytes.Buffer)
    _, err = io.Copy(buf, podLogs)
    if err != nil {
        return
    }
    podlog = buf.String()
    return
}

pod重启

原理使用了exec执行kill 完成

func RestartPodByPodName(ctx context.Context, namespace string, podname string, cluster string) (err error) {
    var stdin, stdout, stderr bytes.Buffer
    client, exists := r.ClientMap[cluster]
    if !exists {
        return errors.Errorf("kube client not found for %s", cluster)
    }
    restclient := r.RestclientMap[cluster]
    pod, err := client.CoreV1().Pods(namespace).Get(ctx, podname, metav1.GetOptions{})
    if err != nil {
        return err
    }
    //restart := pod.Status.ContainerStatuses[0].RestartCount
    containerName := pod.Spec.Containers[0].Name
    command := []string{"kill", "1"}
    req := client.CoreV1().RESTClient().Post().Resource("pods").Name(podname).Namespace(namespace).SubResource("exec").VersionedParams(
        &apiv1.PodExecOptions{
            Container: containerName,
            Command:   command,
            Stdout:    true,
            Stderr:    true,
            Stdin:     true,
            TTY:       true,
        }, scheme.ParameterCodec)
    exec, err := remotecommand.NewSPDYExecutor(restclient, "POST", req.URL())
    if err != nil {
        return err
    }
    err = exec.Stream(
        remotecommand.StreamOptions{
            Stdin:  &stdin,
            Stderr: &stderr,
            Stdout: &stdout,
            Tty:    true,
        },
    )
    //pod, err := client.CoreV1().Pods(namespace).Get(ctx, podname, metav1.GetOptions{})
    //if err != nil {
    //  return err
    //}
    return err
}

informer

参考https://blog.whysdomain.com/blog/333/

其他

获取Node对应的IP地址

func node2ip(n *core_v1.Node) (string, error) {
    for _, i := range n.Status.Addresses {
        if i.Type == "InternalIP" {
            return i.Address, nil
        }
    }
    return "", errors.New("没有获取到InternalIP")
}