原创 吴就业 101 0 2024-05-24
本文为博主原创文章,未经博主允许不得转载。
本文链接:https://www.wujiuye.com/article/b7b6a87848a34b79abc48c9bd17ed00a
作者:吴就业
链接:https://www.wujiuye.com/article/b7b6a87848a34b79abc48c9bd17ed00a
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。
关于持久卷的创建和挂载原理的理解,可查看上一篇文章:Kubernetes CSI插件中持久卷的挂载和映射机制
CSI驱动必不可少的几个组件:
其中csi-provisioner和csi-node-driver-registrar都是k8s提供好的组件,不需要我们开发,但需要部署的时候使用。可查看本篇文章最后给出的部署案例。(还有其它一些非必需组件,可以查看官方文档)
controller组件和node组件就是需要我们开发的,其实就是实现几个gRPC接口。
Controller组件和Node组件都要实现的接口:
type IdentityServer interface {
GetPluginInfo(context.Context, *GetPluginInfoRequest) (*GetPluginInfoResponse, error)
GetPluginCapabilities(context.Context, *GetPluginCapabilitiesRequest) (*GetPluginCapabilitiesResponse, error)
Probe(context.Context, *ProbeRequest) (*ProbeResponse, error)
}
Controller组件需要实现的接口:
type ControllerServer interface {
CreateVolume(context.Context, *CreateVolumeRequest) (*CreateVolumeResponse, error)
DeleteVolume(context.Context, *DeleteVolumeRequest) (*DeleteVolumeResponse, error)
ControllerPublishVolume(context.Context, *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error)
ControllerUnpublishVolume(context.Context, *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error)
ValidateVolumeCapabilities(context.Context, *ValidateVolumeCapabilitiesRequest) (*ValidateVolumeCapabilitiesResponse, error)
ListVolumes(context.Context, *ListVolumesRequest) (*ListVolumesResponse, error)
GetCapacity(context.Context, *GetCapacityRequest) (*GetCapacityResponse, error)
ControllerGetCapabilities(context.Context, *ControllerGetCapabilitiesRequest) (*ControllerGetCapabilitiesResponse, error)
CreateSnapshot(context.Context, *CreateSnapshotRequest) (*CreateSnapshotResponse, error)
DeleteSnapshot(context.Context, *DeleteSnapshotRequest) (*DeleteSnapshotResponse, error)
ListSnapshots(context.Context, *ListSnapshotsRequest) (*ListSnapshotsResponse, error)
ControllerExpandVolume(context.Context, *ControllerExpandVolumeRequest) (*ControllerExpandVolumeResponse, error)
ControllerGetVolume(context.Context, *ControllerGetVolumeRequest) (*ControllerGetVolumeResponse, error)
ControllerModifyVolume(context.Context, *ControllerModifyVolumeRequest) (*ControllerModifyVolumeResponse, error)
}
Controller组件至少可以只实现ControllerServer的这几个接口:
CreateVolume:创建持久卷。
DeleteVolume:删除持久卷。
ControllerGetCapabilities:获取CSI驱动程序的能力列表或功能列表,例如只支持创建删除卷(只实现了CreateVolume、DeleteVolume这两个接口),那么ControllerGetCapabilities的实现可以这样:
var (
controllerCaps = []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
}
)
func (d *controllerService) ControllerGetCapabilities(ctx context.Context, request *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
klog.V(5).Infof("grpc ControllerGetCapabilities...")
var caps []*csi.ControllerServiceCapability
for _, _cap := range controllerCaps {
c := &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: _cap,
},
},
}
caps = append(caps, c)
}
return &csi.ControllerGetCapabilitiesResponse{Capabilities: caps}, nil
}
Node组件需要实现的接口:
type NodeServer interface {
NodeStageVolume(context.Context, *NodeStageVolumeRequest) (*NodeStageVolumeResponse, error)
NodeUnstageVolume(context.Context, *NodeUnstageVolumeRequest) (*NodeUnstageVolumeResponse, error)
NodePublishVolume(context.Context, *NodePublishVolumeRequest) (*NodePublishVolumeResponse, error)
NodeUnpublishVolume(context.Context, *NodeUnpublishVolumeRequest) (*NodeUnpublishVolumeResponse, error)
NodeGetVolumeStats(context.Context, *NodeGetVolumeStatsRequest) (*NodeGetVolumeStatsResponse, error)
NodeExpandVolume(context.Context, *NodeExpandVolumeRequest) (*NodeExpandVolumeResponse, error)
NodeGetCapabilities(context.Context, *NodeGetCapabilitiesRequest) (*NodeGetCapabilitiesResponse, error)
NodeGetInfo(context.Context, *NodeGetInfoRequest) (*NodeGetInfoResponse, error)
}
Node组件至少可以只实现NodeServer的这几个接口:
但在上一篇文章,我们debug gcp-filestore-csi-driver 这个csi驱动的时候,发现在调用NodePublishVolume之前调用了NodeStageVolume,先将卷挂载在节点上,然后多个相同Pod调度到相同Node上时,可以都直接挂载这个目录使用。
如果需要NodeStageVolume被调用,那么我们至少还需要再实现这几个接口:
然后NodeGetCapabilities的实现是这样,返回支持NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME:
func (n *nodeService) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
response := &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
},
}
return response, nil
}
卷的挂载流程可查看上篇文章:Kubernetes CSI插件中持久卷的挂载和映射机制
总结就是:
我们还是以 gcp-filestore-csi-driver 这个csi驱动为例,来debug理解卷的卸载流程。
当有Pod引用该PVC的时候,并不能删除PVC,也不能删除PV。只有先删除所有引用了该PVC的Pod,才能删除PVC。
当Pod被删除,kubelet会调用Pod所在的Node上的Node组件的NodeUnpublishVolume方法,卸载容器卷的挂载。
# 调用Node组件的NodeUnpublishVolume方法,unmount
I0524 03:49:11.869841 1 utils.go:55] GRPC call: /csi.v1.Node/NodeUnpublishVolume
I0524 03:49:11.870357 1 utils.go:56] GRPC request: {"target_path":"/var/lib/kubelet/pods/82da039b-0f63-4923-aa1d-230e14a2855e/volumes/kubernetes.io~csi/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/mount","volume_id":"modeInstance/us-central1-c/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/vol1"}
I0524 03:49:11.873596 1 mount_helper_common.go:93] unmounting "/var/lib/kubelet/pods/82da039b-0f63-4923-aa1d-230e14a2855e/volumes/kubernetes.io~csi/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/mount" (corruptedMount: false, mounterCanSkipMountPointChecks: true)
I0524 03:49:11.874286 1 mount_linux.go:360] Unmounting /var/lib/kubelet/pods/82da039b-0f63-4923-aa1d-230e14a2855e/volumes/kubernetes.io~csi/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/mount
I0524 03:49:11.926782 1 mount_helper_common.go:150] Warning: deleting path "/var/lib/kubelet/pods/82da039b-0f63-4923-aa1d-230e14a2855e/volumes/kubernetes.io~csi/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/mount"
I0524 03:49:11.927006 1 utils.go:61] GRPC response:
然后,kubelet先调用一次NodeGetCapabilities方法。从日记可以看出,NodeGetCapabilities方法返回了支持STAGE_UNSTAGE_VOLUME,所以接着kubelet就会继续调用NodeUnstageVolume方法来卸载持久卷(PV)在Node上的挂载。
# 调用Node组件的NodeGetCapabilities方法
I0524 03:49:11.973778 1 utils.go:55] GRPC call: /csi.v1.Node/NodeGetCapabilities
I0524 03:49:11.973913 1 utils.go:56] GRPC request: {}
I0524 03:49:11.973939 1 utils.go:61] GRPC response: capabilities:<rpc:<type:STAGE_UNSTAGE_VOLUME > > capabilities:<rpc:<type:GET_VOLUME_STATS > >
# 调用NodeUnstageVolume方法
I0524 03:49:11.975045 1 utils.go:55] GRPC call: /csi.v1.Node/NodeUnstageVolume
I0524 03:49:11.975197 1 utils.go:56] GRPC request: {"staging_target_path":"/var/lib/kubelet/plugins/kubernetes.io/csi/filestore.csi.storage.gke.io/270b9be2ea0df2b0c8b6f6871b7725c553ad5e5f5d3a43e496a85b671461c861/globalmount","volume_id":"modeInstance/us-central1-c/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/vol1"}
I0524 03:49:11.975285 1 mount_helper_common.go:93] unmounting "/var/lib/kubelet/plugins/kubernetes.io/csi/filestore.csi.storage.gke.io/270b9be2ea0df2b0c8b6f6871b7725c553ad5e5f5d3a43e496a85b671461c861/globalmount" (corruptedMount: false, mounterCanSkipMountPointChecks: true)
I0524 03:49:11.975331 1 mount_linux.go:360] Unmounting /var/lib/kubelet/plugins/kubernetes.io/csi/filestore.csi.storage.gke.io/270b9be2ea0df2b0c8b6f6871b7725c553ad5e5f5d3a43e496a85b671461c861/globalmount
I0524 03:49:15.001737 1 mount_helper_common.go:150] Warning: deleting path "/var/lib/kubelet/plugins/kubernetes.io/csi/filestore.csi.storage.gke.io/270b9be2ea0df2b0c8b6f6871b7725c553ad5e5f5d3a43e496a85b671461c861/globalmount"
I0524 03:49:15.003393 1 node.go:375] NodeUnstageVolume succeeded on volume modeInstance/us-central1-c/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/vol1 from staging target path /var/lib/kubelet/plugins/kubernetes.io/csi/filestore.csi.storage.gke.io/270b9be2ea0df2b0c8b6f6871b7725c553ad5e5f5d3a43e496a85b671461c861/globalmount
I0524 03:49:15.003460 1 utils.go:61] GRPC response:
当没有Node挂载此持久卷(PV)的时候,我们就可以删除PVC了,当PVC删除的时候,k8s就会调用controller组件的DeleteVolume方法来删除持久卷(PV)。
I0524 03:49:12.881989 1 controller.go:373] DeleteVolume called with request volume_id:"modeInstance/us-central1-c/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa/vol1"
I0524 03:49:13.046257 1 file.go:326] GetInstance call fetched instance &{CapacityGb:1024 CapacityStepSizeGb:1 CreateTime:2024-05-20T09:46:02.447035850Z Description: DirectoryServices:<nil> Etag: FileShares:[0xc000475b20] KmsKeyName: Labels:map[kubernetes_io_created-for_pv_name:pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa kubernetes_io_created-for_pvc_name:test-demo-pvc kubernetes_io_created-for_pvc_namespace:default storage_gke_io_created-by:filestore_csi_storage_gke_io] MaxCapacityGb:65434 MaxShareCount:1 MultiShareEnabled:false Name:projects/infrastructure-410808/locations/us-central1-c/instances/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa Networks:[0xc000199a70] Protocol: SatisfiesPzi:true SatisfiesPzs:false State:READY StatusMessage: SuspensionReasons:[] Tier:BASIC_HDD ServerResponse:{HTTPStatusCode:200 Header:map[Cache-Control:[private] Content-Type:[application/json; charset=UTF-8] Date:[Fri, 24 May 2024 03:49:13 GMT] Server:[ESF] Vary:[Origin X-Origin Referer] X-Content-Type-Options:[nosniff] X-Frame-Options:[SAMEORIGIN] X-Xss-Protection:[0]]} ForceSendFields:[] NullFields:[]}
I0524 03:49:13.047481 1 file.go:390] Starting DeleteInstance cloud operation for instance projects/infrastructure-410808/locations/us-central1-c/instances/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa
I0524 03:49:13.135237 1 file.go:396] For instance projects/infrastructure-410808/locations/us-central1-c/instances/pvc-dcba2328-caad-4dc6-bc70-bd7c5abf4faa, waiting for delete op projects/infrastructure-410808/locations/us-central1-c/operations/operation-1716522553077-6192b0d066d64-b502186b-e4baa266 to complete
执行完这个方法,PVC和PV资源就已经被删除了。
可以使用某位前辈开源的csibuilder工具来初始化一个csi驱动项目,GitHub链接:https://github.com/zwwhdls/csibuilder。不过这个工具生成的项目使用的k8s相关的库都有些老了,作者也没有维护更新,需要自己修改升级一下相关的库。
用csibuilder生成的csi驱动项目:
Main.go
var (
nodeID = os.Getenv("NODE_NAME")
endpoint = os.Getenv("CSI_ENDPOINT")
runController = flag.Bool("controller", false, "run controller service")
runNode = flag.Bool("node", false, "run node service")
)
func main() {
klog.InitFlags(nil)
flag.Parse()
klog.V(5).Infof("node-id=%s endpoint=%s run-controller=%v run-node=%v", nodeID, endpoint, *runController, *runNode)
drv := csi.NewDriver(endpoint, nodeID)
if err := drv.Run(*runController, *runNode); err != nil {
klog.Fatalln(err)
}
}
identity.go
// GetPluginInfo returns the name and version of the plugin
func (d *Driver) GetPluginInfo(ctx context.Context, request *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
klog.V(5).Infof("grpc GetPluginInfo...")
resp := &csi.GetPluginInfoResponse{
Name: DriverName,
VendorVersion: "v1.0.0",
}
return resp, nil
}
// GetPluginCapabilities returns the capabilities of the plugin
func (d *Driver) GetPluginCapabilities(ctx context.Context, request *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
klog.V(5).Infof("grpc GetPluginCapabilities...")
resp := &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
},
}
return resp, nil
}
// Probe returns the health and readiness of the plugin
func (d *Driver) Probe(ctx context.Context, request *csi.ProbeRequest) (*csi.ProbeResponse, error) {
klog.V(5).Infof("grpc Probe...")
return &csi.ProbeResponse{}, nil
}
Node.go
var (
volumeCaps = []csi.VolumeCapability_AccessMode{
{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
}
}
)
type nodeService struct {
nodeID string
}
func newNodeService(nodeID string) nodeService {
return nodeService{
nodeID: nodeID,
}
}
// NodeStageVolume is called by the CO when a workload that wants to use the specified volume is placed (scheduled) on a node.
func (n *nodeService) NodeStageVolume(ctx context.Context, request *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
klog.V(5).Infof("grpc NodeStageVolume...")
// todo impl
return nil, status.Error(codes.Unimplemented, "")
}
// NodeUnstageVolume is called by the CO when a workload that was using the specified volume is being moved to a different node.
func (n *nodeService) NodeUnstageVolume(ctx context.Context, request *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
klog.V(5).Infof("grpc NodeUnstageVolume...")
// todo impl
return nil, status.Error(codes.Unimplemented, "")
}
// NodePublishVolume mounts the volume on the node.
func (n *nodeService) NodePublishVolume(ctx context.Context, request *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
klog.V(5).Infof("grpc NodePublishVolume...")
// todo impl
return &csi.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume unmount the volume from the target path
func (n *nodeService) NodeUnpublishVolume(ctx context.Context, request *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
klog.V(5).Infof("grpc NodeUnpublishVolume...")
// todo impl
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// NodeGetVolumeStats get the volume stats
func (n *nodeService) NodeGetVolumeStats(ctx context.Context, request *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
klog.V(5).Infof("grpc NodeGetVolumeStats...")
return nil, status.Error(codes.Unimplemented, "")
}
// NodeExpandVolume expand the volume
func (n *nodeService) NodeExpandVolume(ctx context.Context, request *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
klog.V(5).Infof("grpc NodeExpandVolume...")
return nil, status.Error(codes.Unimplemented, "")
}
// NodeGetCapabilities get the node capabilities
func (n *nodeService) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
response := &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, // 支持NodeStageVolume和NodeUnstageVolume
},
},
},
},
}
return response, nil
}
// NodeGetInfo get the node info
func (n *nodeService) NodeGetInfo(ctx context.Context, request *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
klog.V(5).Infof("grpc NodeGetInfo...")
return &csi.NodeGetInfoResponse{NodeId: n.nodeID}, nil
}
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool {
hasSupport := func(cap *csi.VolumeCapability) bool {
for _, c := range volumeCaps {
if c.GetMode() == cap.AccessMode.GetMode() {
return true
}
}
return false
}
foundAll := true
for _, c := range volCaps {
if !hasSupport(c) {
foundAll = false
}
}
return foundAll
}
controller.go
var (
controllerCaps = []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, // 提供创建删除卷能力
}
)
type controllerService struct {
}
func newControllerService() controllerService {
return controllerService{}
}
// CreateVolume creates a volume
func (d *controllerService) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
klog.V(5).Infof("grpc CreateVolume...")
klog.V(5).Infof("create volume req params. %v", request.Parameters)
// 这里实现你的卷创建逻辑
return &csi.CreateVolumeResponse{Volume: &volume}, nil
}
// DeleteVolume deletes a volume
func (d *controllerService) DeleteVolume(ctx context.Context, request *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
klog.V(5).Infof("grpc DeleteVolume...")
// 这里实现你的卷删除逻辑
return nil, nil
}
// ControllerGetCapabilities get controller capabilities
func (d *controllerService) ControllerGetCapabilities(ctx context.Context, request *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
klog.V(5).Infof("grpc ControllerGetCapabilities...")
var caps []*csi.ControllerServiceCapability
for _, _cap := range controllerCaps {
c := &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: _cap,
},
},
}
caps = append(caps, c)
}
return &csi.ControllerGetCapabilitiesResponse{Capabilities: caps}, nil
}
// ControllerPublishVolume publish a volume
func (d *controllerService) ControllerPublishVolume(ctx context.Context, request *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
klog.V(5).Infof("grpc ControllerPublishVolume...")
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerUnpublishVolume unpublish a volume
func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, request *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
klog.V(5).Infof("grpc ControllerUnpublishVolume...")
return nil, status.Error(codes.Unimplemented, "")
}
// ValidateVolumeCapabilities validate volume capabilities
func (d *controllerService) ValidateVolumeCapabilities(ctx context.Context, request *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
klog.V(5).Infof("grpc ValidateVolumeCapabilities...")
return nil, status.Error(codes.Unimplemented, "")
}
// ListVolumes list volumes
func (d *controllerService) ListVolumes(ctx context.Context, request *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
klog.V(5).Infof("grpc ListVolumes...")
return nil, status.Error(codes.Unimplemented, "")
}
// GetCapacity get capacity
func (d *controllerService) GetCapacity(ctx context.Context, request *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
klog.V(5).Infof("grpc GetCapacity...")
return nil, status.Error(codes.Unimplemented, "")
}
// CreateSnapshot create a snapshot
func (d *controllerService) CreateSnapshot(ctx context.Context, request *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
klog.V(5).Infof("grpc CreateSnapshot...")
return nil, status.Error(codes.Unimplemented, "")
}
// DeleteSnapshot delete a snapshot
func (d *controllerService) DeleteSnapshot(ctx context.Context, request *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
klog.V(5).Infof("grpc DeleteSnapshot...")
return nil, status.Error(codes.Unimplemented, "")
}
// ListSnapshots list snapshots
func (d *controllerService) ListSnapshots(ctx context.Context, request *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
klog.V(5).Infof("grpc ListSnapshots...")
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerExpandVolume expand a volume
func (d *controllerService) ControllerExpandVolume(ctx context.Context, request *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
klog.V(5).Infof("grpc ControllerExpandVolume...")
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerGetVolume get a volume
func (d *controllerService) ControllerGetVolume(ctx context.Context, request *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
klog.V(5).Infof("grpc ControllerGetVolume...")
return nil, status.Error(codes.Unimplemented, "")
}
func (d *controllerService) ControllerModifyVolume(context.Context, *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
klog.V(5).Infof("grpc ControllerModifyVolume...")
return nil, status.Error(codes.Unimplemented, "")
}
driver.go
const (
// DriverName to be registered
DriverName = "csi.mycsi.wujiuye.com"
)
type Driver struct {
controllerService
nodeService
srv *grpc.Server
endpoint string
}
// NewDriver creates a new driver
func NewDriver(endpoint string, nodeID string) *Driver {
return &Driver{
endpoint: endpoint,
controllerService: newControllerService(),
nodeService: newNodeService(nodeID),
}
}
func (d *Driver) Run(runController, runNode bool) error {
scheme, addr, err := ParseEndpoint(d.endpoint)
if err != nil {
return err
}
listener, err := net.Listen(scheme, addr)
if err != nil {
return err
}
logErr := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
if err != nil {
klog.Errorf("GRPC error: %v", err)
}
return resp, err
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logErr),
}
d.srv = grpc.NewServer(opts...)
csi.RegisterIdentityServer(d.srv, d)
if runController {
csi.RegisterControllerServer(d.srv, d)
}
if runNode {
csi.RegisterNodeServer(d.srv, d)
}
klog.Infof("Listening for connection on address: %#v", listener.Addr())
return d.srv.Serve(listener)
}
func ParseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", fmt.Errorf("could not parse endpoint: %v", err)
}
addr := path.Join(u.Host, filepath.FromSlash(u.Path))
scheme := strings.ToLower(u.Scheme)
switch scheme {
case "tcp":
case "unix":
addr = path.Join("/", addr)
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
return "", "", fmt.Errorf("could not remove unix domain socket %q: %v", addr, err)
}
default:
return "", "", fmt.Errorf("unsupported protocol: %s", scheme)
}
return scheme, addr, nil
}
部署清单:
部署controller组件:
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app.kubernetes.io/component: controller
app.kubernetes.io/name: my-csi-controller
name: my-csi-controller
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: my-csi-controller
serviceName: my-csi-controller
template:
metadata:
labels:
app: my-csi-controller
spec:
serviceAccountName: my-csi-controller
containers:
- name: csi-provisioner
image: quay.io/k8scsi/csi-provisioner:v1.6.0
args:
- --csi-address=$(ADDRESS)
- --timeout=60s
- --v=5
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
volumeMounts:
- mountPath: /var/lib/csi/sockets/pluginproxy/
name: socket-dir
- name: my-csi-driver
image: my-csi-driver:1.0.0
#imagePullPolicy: Always
args:
- --controller=true
- --v=5
env:
- name: CSI_ENDPOINT
value: unix:///var/lib/csi/sockets/pluginproxy/csi.sock
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
capabilities:
add:
- SYS_ADMIN
privileged: true
volumeMounts:
- mountPath: /var/lib/csi/sockets/pluginproxy/
name: socket-dir
volumes:
- emptyDir: {}
name: socket-dir
部署node组件:
kind: DaemonSet
apiVersion: apps/v1
metadata:
name: my-csi-node
namespace: kube-system
spec:
selector:
matchLabels:
app: my-csi-node
template:
metadata:
labels:
app: my-csi-node
spec:
serviceAccountName: my-csi-node
containers:
- name: node-driver-registrar
image: quay.io/k8scsi/csi-node-driver-registrar:v2.1.0
args:
- --csi-address=$(ADDRESS)
- --kubelet-registration-path=$(DRIVER_REG_SOCK_PATH)
- --v=5
env:
- name: ADDRESS
value: /csi/csi.sock
- name: DRIVER_REG_SOCK_PATH
value: /var/lib/kubelet/csi-plugins/csi.mycsi.wujiuye.com/csi.sock
volumeMounts:
- mountPath: /csi
name: plugin-dir
- mountPath: /registration
name: registration-dir
- name: my-csi-driver
image: my-csi-driver:1.0.0
#imagePullPolicy: Always
args:
- --node=true
- --v=5
env:
- name: CSI_ENDPOINT
value: unix:/csi/csi.sock
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
lifecycle:
preStop:
exec:
command:
- /bin/sh
- -c
- rm /csi/csi.sock
securityContext:
privileged: true
volumeMounts:
- mountPath: /var/lib/kubelet
mountPropagation: Bidirectional
name: kubelet-dir
- mountPath: /csi
name: plugin-dir
- mountPath: /registration
name: registration-dir
volumes:
- hostPath:
path: /var/lib/kubelet
type: Directory
name: kubelet-dir
- hostPath:
path: /var/lib/kubelet/csi-plugins/csi.mycsi.wujiuye.com/
type: DirectoryOrCreate
name: plugin-dir
- hostPath:
path: /var/lib/kubelet/plugins_registry/
type: Directory
name: registration-dir
CSIDriver:
apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
labels:
app.kubernetes.io/instance: my-csi-driver
app.kubernetes.io/name: my-csi-driver
app.kubernetes.io/version: master
name: csi.mycsi.wujiuye.com
spec:
attachRequired: false
podInfoOnMount: true
rbac权限相关:
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: my-csi-node-role
rules: []
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: my-csi-controller-role
rules:
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- list
- watch
- create
- delete
- apiGroups:
- ""
resources:
- persistentvolumeclaims
verbs:
- get
- list
- watch
- update
- apiGroups:
- storage.k8s.io
resources:
- storageclasses
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- get
- list
- watch
- create
- update
- patch
- apiGroups:
- storage.k8s.io
resources:
- csinodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: my-csi-node-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: my-csi-node-role
subjects:
- kind: ServiceAccount
name: my-csi-node
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: my-csi-controller-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: my-csi-controller-role
subjects:
- kind: ServiceAccount
name: kube-csi-controller
namespace: kube-system
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: my-csi-controller
namespace: kube-system
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: my-csi-node
namespace: kube-system
声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。
gke和cloudrun混合部署的情况下,gke中的服务如何走内网调用cloudrun服务,以及cloudrun服务如何走内网调用gke中的服务是需要解决的问题。
我们在自己部署autoscaler到gke集群中的时候遇到了403的问题,这个问题后来我们自己部署gcp-filestore-csi-driver的时候也遇到了。
在gcp平台上,使用gke服务,创建一个k8s集群,若想在本地能够通过kubectl命令或者可视化工具访问到集群,需要通过gcloud命令获取访问集群的证书。
订阅
订阅新文章发布通知吧,不错过精彩内容!
输入邮箱,提交后我们会给您发送一封邮件,您需点击邮件中的链接完成订阅设置。