如何编写一个CSI驱动项目,k8s csi驱动开发

原创 吴就业 101 0 2024-05-24

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://www.wujiuye.com/article/b7b6a87848a34b79abc48c9bd17ed00a

作者:吴就业
链接:https://www.wujiuye.com/article/b7b6a87848a34b79abc48c9bd17ed00a
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

关于持久卷的创建和挂载原理的理解,可查看上一篇文章:Kubernetes CSI插件中持久卷的挂载和映射机制

CSI驱动的接口

CSI驱动必不可少的几个组件:

其中csi-provisioner和csi-node-driver-registrar都是k8s提供好的组件,不需要我们开发,但需要部署的时候使用。可查看本篇文章最后给出的部署案例。(还有其它一些非必需组件,可以查看官方文档)

controller组件和node组件就是需要我们开发的,其实就是实现几个gRPC接口。

Controller组件和Node组件都要实现的接口:

type IdentityServer interface {
	GetPluginInfo(context.Context, *GetPluginInfoRequest) (*GetPluginInfoResponse, error)
	GetPluginCapabilities(context.Context, *GetPluginCapabilitiesRequest) (*GetPluginCapabilitiesResponse, error)
	Probe(context.Context, *ProbeRequest) (*ProbeResponse, error)
}

Controller组件需要实现的接口:

type ControllerServer interface {
	CreateVolume(context.Context, *CreateVolumeRequest) (*CreateVolumeResponse, error)
	DeleteVolume(context.Context, *DeleteVolumeRequest) (*DeleteVolumeResponse, error)
	ControllerPublishVolume(context.Context, *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error)
	ControllerUnpublishVolume(context.Context, *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error)
	ValidateVolumeCapabilities(context.Context, *ValidateVolumeCapabilitiesRequest) (*ValidateVolumeCapabilitiesResponse, error)
	ListVolumes(context.Context, *ListVolumesRequest) (*ListVolumesResponse, error)
	GetCapacity(context.Context, *GetCapacityRequest) (*GetCapacityResponse, error)
	ControllerGetCapabilities(context.Context, *ControllerGetCapabilitiesRequest) (*ControllerGetCapabilitiesResponse, error)
	CreateSnapshot(context.Context, *CreateSnapshotRequest) (*CreateSnapshotResponse, error)
	DeleteSnapshot(context.Context, *DeleteSnapshotRequest) (*DeleteSnapshotResponse, error)
	ListSnapshots(context.Context, *ListSnapshotsRequest) (*ListSnapshotsResponse, error)
	ControllerExpandVolume(context.Context, *ControllerExpandVolumeRequest) (*ControllerExpandVolumeResponse, error)
	ControllerGetVolume(context.Context, *ControllerGetVolumeRequest) (*ControllerGetVolumeResponse, error)
	ControllerModifyVolume(context.Context, *ControllerModifyVolumeRequest) (*ControllerModifyVolumeResponse, error)
}

Controller组件至少可以只实现ControllerServer的这几个接口:

  var (
  	controllerCaps = []csi.ControllerServiceCapability_RPC_Type{
  		csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
  	}
  )
  
  func (d *controllerService) ControllerGetCapabilities(ctx context.Context, request *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
  	klog.V(5).Infof("grpc ControllerGetCapabilities...")
  	var caps []*csi.ControllerServiceCapability
  	for _, _cap := range controllerCaps {
  		c := &csi.ControllerServiceCapability{
  			Type: &csi.ControllerServiceCapability_Rpc{
  				Rpc: &csi.ControllerServiceCapability_RPC{
  					Type: _cap,
  				},
  			},
  		}
  		caps = append(caps, c)
  	}
  	return &csi.ControllerGetCapabilitiesResponse{Capabilities: caps}, nil
  }

Node组件需要实现的接口:

type NodeServer interface {
	NodeStageVolume(context.Context, *NodeStageVolumeRequest) (*NodeStageVolumeResponse, error)
	NodeUnstageVolume(context.Context, *NodeUnstageVolumeRequest) (*NodeUnstageVolumeResponse, error)
	NodePublishVolume(context.Context, *NodePublishVolumeRequest) (*NodePublishVolumeResponse, error)
	NodeUnpublishVolume(context.Context, *NodeUnpublishVolumeRequest) (*NodeUnpublishVolumeResponse, error)
	NodeGetVolumeStats(context.Context, *NodeGetVolumeStatsRequest) (*NodeGetVolumeStatsResponse, error)
	NodeExpandVolume(context.Context, *NodeExpandVolumeRequest) (*NodeExpandVolumeResponse, error)
	NodeGetCapabilities(context.Context, *NodeGetCapabilitiesRequest) (*NodeGetCapabilitiesResponse, error)
	NodeGetInfo(context.Context, *NodeGetInfoRequest) (*NodeGetInfoResponse, error)
}

Node组件至少可以只实现NodeServer的这几个接口:

但在上一篇文章,我们debug gcp-filestore-csi-driver 这个csi驱动的时候,发现在调用NodePublishVolume之前调用了NodeStageVolume,先将卷挂载在节点上,然后多个相同Pod调度到相同Node上时,可以都直接挂载这个目录使用。

如果需要NodeStageVolume被调用,那么我们至少还需要再实现这几个接口:

然后NodeGetCapabilities的实现是这样,返回支持NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME:

func (n *nodeService) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
	response := &csi.NodeGetCapabilitiesResponse{
		Capabilities: []*csi.NodeServiceCapability{
			{
				Type: &csi.NodeServiceCapability_Rpc{
					Rpc: &csi.NodeServiceCapability_RPC{
						Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
					},
				},
			},
		},
	}
	return response, nil
}

卷的挂载流程

卷的挂载流程可查看上篇文章:Kubernetes CSI插件中持久卷的挂载和映射机制

总结就是:

  1. K8s(csi-provisioner)先调用controller组件的CreateVolume来为PVC创建一个持久卷,此方法调用结束后,PV资源被创建(由k8s创建)。
  2. kubelet调用Node组件的NodeStageVolume来将持久卷(PV)挂载到Node上,挂载到一个全局目录,目的是让调度在此节点上的使用相同PVC的Pod都可以直接挂载这个目录即可。
  3. kubelet调用Node组件的NodePublishVolume来将持久卷(PV)挂载在Node上的全局目录挂载到Pod的容器卷目录。

卷的卸载流程

我们还是以 gcp-filestore-csi-driver 这个csi驱动为例,来debug理解卷的卸载流程。

当有Pod引用该PVC的时候,并不能删除PVC,也不能删除PV。只有先删除所有引用了该PVC的Pod,才能删除PVC。

当Pod被删除,kubelet会调用Pod所在的Node上的Node组件的NodeUnpublishVolume方法,卸载容器卷的挂载。

# 调用Node组件的NodeUnpublishVolume方法,unmount
I0524 03:49:11.869841       1 utils.go:55] GRPC call: /csi.v1.Node/NodeUnpublishVolume
I0524 03:49:11.870357       1 utils.go:56] GRPC request: {"target_path":"/var/lib/kubelet/pods/82da039b-0f63-4923-aa1d-230e14a2855e/volumes/kubernetes.io~csi/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/mount","volume_id":"modeInstance/us-central1-c/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/vol1"}
I0524 03:49:11.873596       1 mount_helper_common.go:93] unmounting "/var/lib/kubelet/pods/82da039b-0f63-4923-aa1d-230e14a2855e/volumes/kubernetes.io~csi/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/mount" (corruptedMount: false, mounterCanSkipMountPointChecks: true)
I0524 03:49:11.874286       1 mount_linux.go:360] Unmounting /var/lib/kubelet/pods/82da039b-0f63-4923-aa1d-230e14a2855e/volumes/kubernetes.io~csi/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/mount
I0524 03:49:11.926782       1 mount_helper_common.go:150] Warning: deleting path "/var/lib/kubelet/pods/82da039b-0f63-4923-aa1d-230e14a2855e/volumes/kubernetes.io~csi/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/mount"
I0524 03:49:11.927006       1 utils.go:61] GRPC response: 

然后,kubelet先调用一次NodeGetCapabilities方法。从日记可以看出,NodeGetCapabilities方法返回了支持STAGE_UNSTAGE_VOLUME,所以接着kubelet就会继续调用NodeUnstageVolume方法来卸载持久卷(PV)在Node上的挂载。

# 调用Node组件的NodeGetCapabilities方法
I0524 03:49:11.973778       1 utils.go:55] GRPC call: /csi.v1.Node/NodeGetCapabilities
I0524 03:49:11.973913       1 utils.go:56] GRPC request: {}
I0524 03:49:11.973939       1 utils.go:61] GRPC response: capabilities:<rpc:<type:STAGE_UNSTAGE_VOLUME > > capabilities:<rpc:<type:GET_VOLUME_STATS > > 

# 调用NodeUnstageVolume方法
I0524 03:49:11.975045       1 utils.go:55] GRPC call: /csi.v1.Node/NodeUnstageVolume
I0524 03:49:11.975197       1 utils.go:56] GRPC request: {"staging_target_path":"/var/lib/kubelet/plugins/kubernetes.io/csi/filestore.csi.storage.gke.io/270b9be2ea0df2b0c8b6f6871b7725c553ad5e5f5d3a43e496a85b671461c861/globalmount","volume_id":"modeInstance/us-central1-c/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/vol1"}
I0524 03:49:11.975285       1 mount_helper_common.go:93] unmounting "/var/lib/kubelet/plugins/kubernetes.io/csi/filestore.csi.storage.gke.io/270b9be2ea0df2b0c8b6f6871b7725c553ad5e5f5d3a43e496a85b671461c861/globalmount" (corruptedMount: false, mounterCanSkipMountPointChecks: true)
I0524 03:49:11.975331       1 mount_linux.go:360] Unmounting /var/lib/kubelet/plugins/kubernetes.io/csi/filestore.csi.storage.gke.io/270b9be2ea0df2b0c8b6f6871b7725c553ad5e5f5d3a43e496a85b671461c861/globalmount
I0524 03:49:15.001737       1 mount_helper_common.go:150] Warning: deleting path "/var/lib/kubelet/plugins/kubernetes.io/csi/filestore.csi.storage.gke.io/270b9be2ea0df2b0c8b6f6871b7725c553ad5e5f5d3a43e496a85b671461c861/globalmount"
I0524 03:49:15.003393       1 node.go:375] NodeUnstageVolume succeeded on volume modeInstance/us-central1-c/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/vol1 from staging target path /var/lib/kubelet/plugins/kubernetes.io/csi/filestore.csi.storage.gke.io/270b9be2ea0df2b0c8b6f6871b7725c553ad5e5f5d3a43e496a85b671461c861/globalmount
I0524 03:49:15.003460       1 utils.go:61] GRPC response:

当没有Node挂载此持久卷(PV)的时候,我们就可以删除PVC了,当PVC删除的时候,k8s就会调用controller组件的DeleteVolume方法来删除持久卷(PV)。

I0524 03:49:12.881989       1 controller.go:373] DeleteVolume called with request volume_id:"modeInstance/us-central1-c/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/vol1" 
I0524 03:49:13.046257       1 file.go:326] GetInstance call fetched instance &{CapacityGb:1024 CapacityStepSizeGb:1 CreateTime:2024-05-20T09:46:02.447035850Z Description: DirectoryServices:<nil> Etag: FileShares:[0xc000475b20] KmsKeyName: Labels:map[kubernetes_io_created-for_pv_name:pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa kubernetes_io_created-for_pvc_name:test-demo-pvc kubernetes_io_created-for_pvc_namespace:default storage_gke_io_created-by:filestore_csi_storage_gke_io] MaxCapacityGb:65434 MaxShareCount:1 MultiShareEnabled:false Name:projects/infrastructure-410808/locations/us-central1-c/instances/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa Networks:[0xc000199a70] Protocol: SatisfiesPzi:true SatisfiesPzs:false State:READY StatusMessage: SuspensionReasons:[] Tier:BASIC_HDD ServerResponse:{HTTPStatusCode:200 Header:map[Cache-Control:[private] Content-Type:[application/json; charset=UTF-8] Date:[Fri, 24 May 2024 03:49:13 GMT] Server:[ESF] Vary:[Origin X-Origin Referer] X-Content-Type-Options:[nosniff] X-Frame-Options:[SAMEORIGIN] X-Xss-Protection:[0]]} ForceSendFields:[] NullFields:[]}
I0524 03:49:13.047481       1 file.go:390] Starting DeleteInstance cloud operation for instance projects/infrastructure-410808/locations/us-central1-c/instances/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa
I0524 03:49:13.135237       1 file.go:396] For instance projects/infrastructure-410808/locations/us-central1-c/instances/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa, waiting for delete op projects/infrastructure-410808/locations/us-central1-c/operations/operation-1716522553077-6192b0d066d64-b502186b-e4baa266 to complete

执行完这个方法,PVC和PV资源就已经被删除了。

快速创建一个csi驱动项目

可以使用某位前辈开源的csibuilder工具来初始化一个csi驱动项目,GitHub链接:https://github.com/zwwhdls/csibuilder。不过这个工具生成的项目使用的k8s相关的库都有些老了,作者也没有维护更新,需要自己修改升级一下相关的库。

用csibuilder生成的csi驱动项目:

截屏2024-05-24 15.03.52

Main.go

var (
	nodeID        = os.Getenv("NODE_NAME")
	endpoint      = os.Getenv("CSI_ENDPOINT")
	runController = flag.Bool("controller", false, "run controller service")
	runNode       = flag.Bool("node", false, "run node service")
)

func main() {
	klog.InitFlags(nil)
	flag.Parse()
	klog.V(5).Infof("node-id=%s endpoint=%s run-controller=%v run-node=%v", nodeID, endpoint, *runController, *runNode)
	drv := csi.NewDriver(endpoint, nodeID)
	if err := drv.Run(*runController, *runNode); err != nil {
		klog.Fatalln(err)
	}
}

identity.go

// GetPluginInfo returns the name and version of the plugin
func (d *Driver) GetPluginInfo(ctx context.Context, request *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
	klog.V(5).Infof("grpc GetPluginInfo...")
	resp := &csi.GetPluginInfoResponse{
		Name:          DriverName,
		VendorVersion: "v1.0.0",
	}
	return resp, nil
}

// GetPluginCapabilities returns the capabilities of the plugin
func (d *Driver) GetPluginCapabilities(ctx context.Context, request *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
	klog.V(5).Infof("grpc GetPluginCapabilities...")
	resp := &csi.GetPluginCapabilitiesResponse{
		Capabilities: []*csi.PluginCapability{
			{
				Type: &csi.PluginCapability_Service_{
					Service: &csi.PluginCapability_Service{
						Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
					},
				},
			},
		},
	}
	return resp, nil
}

// Probe returns the health and readiness of the plugin
func (d *Driver) Probe(ctx context.Context, request *csi.ProbeRequest) (*csi.ProbeResponse, error) {
	klog.V(5).Infof("grpc Probe...")
	return &csi.ProbeResponse{}, nil
}

Node.go

var (
	volumeCaps = []csi.VolumeCapability_AccessMode{
		{
			Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
		}
	}
)

type nodeService struct {
	nodeID string
}

func newNodeService(nodeID string) nodeService {
	return nodeService{
		nodeID: nodeID,
	}
}

// NodeStageVolume is called by the CO when a workload that wants to use the specified volume is placed (scheduled) on a node.
func (n *nodeService) NodeStageVolume(ctx context.Context, request *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
	klog.V(5).Infof("grpc NodeStageVolume...")
        // todo impl
	return nil, status.Error(codes.Unimplemented, "")
}

// NodeUnstageVolume is called by the CO when a workload that was using the specified volume is being moved to a different node.
func (n *nodeService) NodeUnstageVolume(ctx context.Context, request *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
	klog.V(5).Infof("grpc NodeUnstageVolume...")
        // todo impl
	return nil, status.Error(codes.Unimplemented, "")
}

// NodePublishVolume mounts the volume on the node.
func (n *nodeService) NodePublishVolume(ctx context.Context, request *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
	klog.V(5).Infof("grpc NodePublishVolume...")
        // todo impl
	return &csi.NodePublishVolumeResponse{}, nil
}

// NodeUnpublishVolume unmount the volume from the target path
func (n *nodeService) NodeUnpublishVolume(ctx context.Context, request *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
	klog.V(5).Infof("grpc NodeUnpublishVolume...")
        // todo impl
	return &csi.NodeUnpublishVolumeResponse{}, nil
}

// NodeGetVolumeStats get the volume stats
func (n *nodeService) NodeGetVolumeStats(ctx context.Context, request *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
	klog.V(5).Infof("grpc NodeGetVolumeStats...")
	return nil, status.Error(codes.Unimplemented, "")
}

// NodeExpandVolume expand the volume
func (n *nodeService) NodeExpandVolume(ctx context.Context, request *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
	klog.V(5).Infof("grpc NodeExpandVolume...")
	return nil, status.Error(codes.Unimplemented, "")
}

// NodeGetCapabilities get the node capabilities
func (n *nodeService) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
	response := &csi.NodeGetCapabilitiesResponse{
		Capabilities: []*csi.NodeServiceCapability{
			{
				Type: &csi.NodeServiceCapability_Rpc{
					Rpc: &csi.NodeServiceCapability_RPC{
						Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, // 支持NodeStageVolume和NodeUnstageVolume
					},
				},
			},
		},
	}
	return response, nil
}

// NodeGetInfo get the node info
func (n *nodeService) NodeGetInfo(ctx context.Context, request *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
	klog.V(5).Infof("grpc NodeGetInfo...")
	return &csi.NodeGetInfoResponse{NodeId: n.nodeID}, nil
}

func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool {
	hasSupport := func(cap *csi.VolumeCapability) bool {
		for _, c := range volumeCaps {
			if c.GetMode() == cap.AccessMode.GetMode() {
				return true
			}
		}
		return false
	}

	foundAll := true
	for _, c := range volCaps {
		if !hasSupport(c) {
			foundAll = false
		}
	}
	return foundAll
}

controller.go

var (
	controllerCaps = []csi.ControllerServiceCapability_RPC_Type{
		csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, // 提供创建删除卷能力
	}
)

type controllerService struct {
}

func newControllerService() controllerService {
	return controllerService{}
}

// CreateVolume creates a volume
func (d *controllerService) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
	klog.V(5).Infof("grpc CreateVolume...")

	klog.V(5).Infof("create volume req params. %v", request.Parameters)
	// 这里实现你的卷创建逻辑
	return &csi.CreateVolumeResponse{Volume: &volume}, nil
}

// DeleteVolume deletes a volume
func (d *controllerService) DeleteVolume(ctx context.Context, request *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
	klog.V(5).Infof("grpc DeleteVolume...")
	// 这里实现你的卷删除逻辑
	return nil, nil
}

// ControllerGetCapabilities get controller capabilities
func (d *controllerService) ControllerGetCapabilities(ctx context.Context, request *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
	klog.V(5).Infof("grpc ControllerGetCapabilities...")
	var caps []*csi.ControllerServiceCapability
	for _, _cap := range controllerCaps {
		c := &csi.ControllerServiceCapability{
			Type: &csi.ControllerServiceCapability_Rpc{
				Rpc: &csi.ControllerServiceCapability_RPC{
					Type: _cap,
				},
			},
		}
		caps = append(caps, c)
	}
	return &csi.ControllerGetCapabilitiesResponse{Capabilities: caps}, nil
}

// ControllerPublishVolume publish a volume
func (d *controllerService) ControllerPublishVolume(ctx context.Context, request *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
	klog.V(5).Infof("grpc ControllerPublishVolume...")
	return nil, status.Error(codes.Unimplemented, "")
}

// ControllerUnpublishVolume unpublish a volume
func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, request *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
	klog.V(5).Infof("grpc ControllerUnpublishVolume...")
	return nil, status.Error(codes.Unimplemented, "")
}

// ValidateVolumeCapabilities validate volume capabilities
func (d *controllerService) ValidateVolumeCapabilities(ctx context.Context, request *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
	klog.V(5).Infof("grpc ValidateVolumeCapabilities...")
	return nil, status.Error(codes.Unimplemented, "")
}

// ListVolumes list volumes
func (d *controllerService) ListVolumes(ctx context.Context, request *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
	klog.V(5).Infof("grpc ListVolumes...")
	return nil, status.Error(codes.Unimplemented, "")
}

// GetCapacity get capacity
func (d *controllerService) GetCapacity(ctx context.Context, request *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
	klog.V(5).Infof("grpc GetCapacity...")
	return nil, status.Error(codes.Unimplemented, "")
}

// CreateSnapshot create a snapshot
func (d *controllerService) CreateSnapshot(ctx context.Context, request *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
	klog.V(5).Infof("grpc CreateSnapshot...")
	return nil, status.Error(codes.Unimplemented, "")
}

// DeleteSnapshot delete a snapshot
func (d *controllerService) DeleteSnapshot(ctx context.Context, request *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
	klog.V(5).Infof("grpc DeleteSnapshot...")
	return nil, status.Error(codes.Unimplemented, "")
}

// ListSnapshots list snapshots
func (d *controllerService) ListSnapshots(ctx context.Context, request *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
	klog.V(5).Infof("grpc ListSnapshots...")
	return nil, status.Error(codes.Unimplemented, "")
}

// ControllerExpandVolume expand a volume
func (d *controllerService) ControllerExpandVolume(ctx context.Context, request *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
	klog.V(5).Infof("grpc ControllerExpandVolume...")
	return nil, status.Error(codes.Unimplemented, "")
}

// ControllerGetVolume get a volume
func (d *controllerService) ControllerGetVolume(ctx context.Context, request *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
	klog.V(5).Infof("grpc ControllerGetVolume...")
	return nil, status.Error(codes.Unimplemented, "")
}

func (d *controllerService) ControllerModifyVolume(context.Context, *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
	klog.V(5).Infof("grpc ControllerModifyVolume...")
	return nil, status.Error(codes.Unimplemented, "")
}

driver.go

const (
	// DriverName to be registered
	DriverName = "csi.mycsi.wujiuye.com"
)

type Driver struct {
	controllerService
	nodeService

	srv      *grpc.Server
	endpoint string
}

// NewDriver creates a new driver
func NewDriver(endpoint string, nodeID string) *Driver {
	return &Driver{
		endpoint:          endpoint,
		controllerService: newControllerService(),
		nodeService:       newNodeService(nodeID),
	}
}

func (d *Driver) Run(runController, runNode bool) error {
	scheme, addr, err := ParseEndpoint(d.endpoint)
	if err != nil {
		return err
	}

	listener, err := net.Listen(scheme, addr)
	if err != nil {
		return err
	}

	logErr := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		resp, err := handler(ctx, req)
		if err != nil {
			klog.Errorf("GRPC error: %v", err)
		}
		return resp, err
	}
	opts := []grpc.ServerOption{
		grpc.UnaryInterceptor(logErr),
	}
	d.srv = grpc.NewServer(opts...)

	csi.RegisterIdentityServer(d.srv, d)
	if runController {
		csi.RegisterControllerServer(d.srv, d)
	}
	if runNode {
		csi.RegisterNodeServer(d.srv, d)
	}

	klog.Infof("Listening for connection on address: %#v", listener.Addr())
	return d.srv.Serve(listener)
}

func ParseEndpoint(endpoint string) (string, string, error) {
	u, err := url.Parse(endpoint)
	if err != nil {
		return "", "", fmt.Errorf("could not parse endpoint: %v", err)
	}

	addr := path.Join(u.Host, filepath.FromSlash(u.Path))

	scheme := strings.ToLower(u.Scheme)
	switch scheme {
	case "tcp":
	case "unix":
		addr = path.Join("/", addr)
		if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
			return "", "", fmt.Errorf("could not remove unix domain socket %q: %v", addr, err)
		}
	default:
		return "", "", fmt.Errorf("unsupported protocol: %s", scheme)
	}

	return scheme, addr, nil
}

部署清单:

部署controller组件:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app.kubernetes.io/component: controller
    app.kubernetes.io/name: my-csi-controller
  name: my-csi-controller
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: my-csi-controller
  serviceName: my-csi-controller
  template:
    metadata:
      labels:
        app: my-csi-controller
    spec:
      serviceAccountName: my-csi-controller
      containers:
        - name: csi-provisioner
          image: quay.io/k8scsi/csi-provisioner:v1.6.0
          args:
            - --csi-address=$(ADDRESS)
            - --timeout=60s
            - --v=5
          env:
            - name: ADDRESS
              value: /var/lib/csi/sockets/pluginproxy/csi.sock
          volumeMounts:
            - mountPath: /var/lib/csi/sockets/pluginproxy/
              name: socket-dir
        - name: my-csi-driver
          image: my-csi-driver:1.0.0
          #imagePullPolicy: Always
          args:
            - --controller=true
            - --v=5
          env:
            - name: CSI_ENDPOINT
              value: unix:///var/lib/csi/sockets/pluginproxy/csi.sock
            - name: NODE_NAME
              valueFrom:
                fieldRef:
                  fieldPath: spec.nodeName
          securityContext:
            capabilities:
              add:
                - SYS_ADMIN
            privileged: true
          volumeMounts:
            - mountPath: /var/lib/csi/sockets/pluginproxy/
              name: socket-dir
      volumes:
        - emptyDir: {}
          name: socket-dir

部署node组件:

kind: DaemonSet
apiVersion: apps/v1
metadata:
  name: my-csi-node
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: my-csi-node
  template:
    metadata:
      labels:
        app: my-csi-node
    spec:
      serviceAccountName: my-csi-node
      containers:
        - name: node-driver-registrar
          image: quay.io/k8scsi/csi-node-driver-registrar:v2.1.0
          args:
            - --csi-address=$(ADDRESS)
            - --kubelet-registration-path=$(DRIVER_REG_SOCK_PATH)
            - --v=5
          env:
            - name: ADDRESS
              value: /csi/csi.sock
            - name: DRIVER_REG_SOCK_PATH
              value: /var/lib/kubelet/csi-plugins/csi.mycsi.wujiuye.com/csi.sock
          volumeMounts:
            - mountPath: /csi
              name: plugin-dir
            - mountPath: /registration
              name: registration-dir
        - name: my-csi-driver
          image: my-csi-driver:1.0.0
          #imagePullPolicy: Always
          args:
            - --node=true
            - --v=5
          env:
            - name: CSI_ENDPOINT
              value: unix:/csi/csi.sock
            - name: NODE_NAME
              valueFrom:
                fieldRef:
                  fieldPath: spec.nodeName
          lifecycle:
            preStop:
              exec:
                command:
                  - /bin/sh
                  - -c
                  - rm /csi/csi.sock
          securityContext:
            privileged: true
          volumeMounts:
            - mountPath: /var/lib/kubelet
              mountPropagation: Bidirectional
              name: kubelet-dir
            - mountPath: /csi
              name: plugin-dir
            - mountPath: /registration
              name: registration-dir
      volumes:
        - hostPath:
            path: /var/lib/kubelet
            type: Directory
          name: kubelet-dir
        - hostPath:
            path: /var/lib/kubelet/csi-plugins/csi.mycsi.wujiuye.com/
            type: DirectoryOrCreate
          name: plugin-dir
        - hostPath:
            path: /var/lib/kubelet/plugins_registry/
            type: Directory
          name: registration-dir

CSIDriver:

apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
  labels:
    app.kubernetes.io/instance: my-csi-driver
    app.kubernetes.io/name: my-csi-driver
    app.kubernetes.io/version: master
  name: csi.mycsi.wujiuye.com
spec:
  attachRequired: false
  podInfoOnMount: true

rbac权限相关:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: my-csi-node-role
rules: []
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: my-csi-controller-role
rules:
  - apiGroups:
      - ""
    resources:
      - persistentvolumes
    verbs:
      - get
      - list
      - watch
      - create
      - delete
  - apiGroups:
      - ""
    resources:
      - persistentvolumeclaims
    verbs:
      - get
      - list
      - watch
      - update
  - apiGroups:
      - storage.k8s.io
    resources:
      - storageclasses
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - events
    verbs:
      - get
      - list
      - watch
      - create
      - update
      - patch
  - apiGroups:
      - storage.k8s.io
    resources:
      - csinodes
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - nodes
    verbs:
      - get
      - list
      - watch
  ---
  apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: my-csi-node-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: my-csi-node-role
subjects:
  - kind: ServiceAccount
    name: my-csi-node
    namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: my-csi-controller-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: my-csi-controller-role
subjects:
  - kind: ServiceAccount
    name: kube-csi-controller
    namespace: kube-system
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: my-csi-controller
  namespace: kube-system
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: my-csi-node
  namespace: kube-system
#云原生

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

cloudrun和gke中服务互相调用如何走内网

gke和cloudrun混合部署的情况下,gke中的服务如何走内网调用cloudrun服务,以及cloudrun服务如何走内网调用gke中的服务是需要解决的问题。

实验:在一个demoset pod中,是否能使用tc拦截同node上其它pod的流量

在k8s集群中,验证在一个demoset的pod中,是否能使用tc来拦截部署在同node上的其它pod的流量?

通过实验理解csi卷创建、挂载、卸载、删除的全流程

基于自定义的csi驱动,通过实验串联理解真个csi的工作流程。

Kubernetes CSI插件中持久卷的挂载和映射机制

持久卷是什么时候创建的,是什么时候挂载的,是怎么创建怎么挂载的?

gcp平台google api的授权,与autoscaler部署配置授权

我们在自己部署autoscaler到gke集群中的时候遇到了403的问题,这个问题后来我们自己部署gcp-filestore-csi-driver的时候也遇到了。

在gcp平台上创建一个gke集群,怎么获取gke集群的证书

在gcp平台上,使用gke服务,创建一个k8s集群,若想在本地能够通过kubectl命令或者可视化工具访问到集群,需要通过gcloud命令获取访问集群的证书。