ドキドキするとき無敵でしょ

映画とプログラミングの話

flannelのcni-pluginを読む

Windowsを起動すると4、5分でブルースクリーン、Gunziです。とてもつらい。

お盆休み中にプロトコル・スタック自作をしてだいぶモチベーションが回復したので、
しばらくk8sのネットワーク部分の探検をしようと思い立ち、とりあえずcni-pluginがよくわからんので、作ってみることにした。

色々やってみた結果、よくわからなかったので、よく聞くflannelのソースを読んで他のCNIプラグインがどのように動作しているのか?
をまずは調べつつ、自作CNIプラグインチャレンジをすることにした。

ただ、前提知識がやたらに多く、説明が抜けていたり若干間違えている可能性が高いので、ご了承いただきたい。

とりあえず手を動かす

どうやらShellで自作した人がいるようなので、自宅にk8sの環境を構築してやってみた。

www.altoros.com

ノード間の通信はうまくいかなかったが単一ノード内の通信はうまくできた。
どうやら入力自体は標準入力で受け取る関係上、理論上はどんな言語でも構わないようだ。

コードを読むとどうやらpodに仮想ニックを刺して作成したネットワーク名前空間に接続しているようだった。何をして動いてるかはなんとなく掴めた。

CNIプラグインとは

というわけでこいつがなんなのかについての話。

公式ドキュメントはこちら。

Network Plugins

よくわからん…
色々な資料やソースコードを読み漁ったところ、コンテナのネットワークを作成・削除するための仕組みらしい。ContainerNetworkingInterfaceだもん、そりゃそうか。

いい感じにk8sネットワークの仕組みの全体を解説している日本語資料があったので、ググりつつ、こちらで全体像を学ばせてもらった。見切り発車で始めたので、概要を掴むのにとても良かった。

speakerdeck.com

cni-plugin は仕様により実装するコマンドが決まっている。これはcni-pluginの仕様書で、この仕様書にあるCNI operationsの項目が実装する必要のあるコマンドになる。

仕様書によれば

  • ADD
  • DEL
  • CHECK

の3つを実装する必要がある。ここからはflannelのcni-pluginにある、ADD,DELCHECKに対応しているコマンドの部分を読んでいこうと思う。

なぜflannelかというと、有名なイメージが強く、システム自体がとてもシンプルなので参考にするのに向いていると思ったからだ。クラスターネットワークの仕組みもシンプルなので、実際に作成する際には参考にする。

実装を読んでみる

というわけでflannelのcni-pluginのソースコードを実際に読んでみた。 読んで思ったことや、処理についてコメントを思い思いに残しているので、いい感じにみなさんも読みとっていただければと思う。

github.com

cmdAdd()

cmdAdd()はコンテナをネットワークに追加する。
flannelではcmdAdd()→doCmdAdd→delegateCmdAddの順で処理している。
delegateCmdAddではinvoke.DelegateAddを呼び出している。
パッケージに説明があり、この関数はCNI ADD、もしくはJSONコンフィグを使用して指定されたdelegate pluginを呼び出している。
デフォルトではブリッジプラグインのため、構成時に指定されたブリッジプラグインのADDコマンドを実行している。
”bridge”以外が指定されていた場合はそれらを呼び出す。はず。多分。

delegateAddの名前の通り、最後にinvoke.DelegateAddを呼び、ブリッジプラグインのADDを実行している。
invoke package - github.com/containernetworking/cni/pkg/invoke - Go Packages

func cmdAdd(args *skel.CmdArgs) error {
// loadFlannelNetConf
// 標準入力でNetConfを受取り、ロードしてJSONをアンマーシャルする
    n, err := loadFlannelNetConf(args.StdinData)
    if err != nil {
        return fmt.Errorf("loadFlannelNetConf failed: %w", err)
    }

// /run/flannel/subnet.env からネットワーク構成を読み取る
    fenv, err := loadFlannelSubnetEnv(n.SubnetFile)
    if err != nil {
        return fmt.Errorf("loadFlannelSubnetEnv failed: %w", err)
    }

// delegateの値はflannelプラグインはデフォルトでブリッジプラグインに移譲
// 追加の設定値をブリッジプラグインに渡す必要がある場合はDelegateフィールドを利用
    if n.Delegate == nil {
        n.Delegate = make(map[string]interface{})
    } else {
// それぞれDelegateマップにキーが存在するかをチェックしている
// typeが存在するかつ値が文字列でない
        if hasKey(n.Delegate, "type") && !isString(n.Delegate["type"]) {
            return fmt.Errorf("'delegate' dictionary, if present, must have (string) 'type' field")
        }
// nameが存在する
        if hasKey(n.Delegate, "name") {
            return fmt.Errorf("'delegate' dictionary must not have 'name' field, it'll be set by flannel")
        }
// ipamが存在する
        if hasKey(n.Delegate, "ipam") {
            return fmt.Errorf("'delegate' dictionary must not have 'ipam' field, it'll be set by flannel")
        }
    }

// runtimeConfigはomitemptyになっているため、空のときはスキップされる
    if n.RuntimeConfig != nil {
        n.Delegate["runtimeConfig"] = n.RuntimeConfig
    }

    return doCmdAdd(args, n, fenv)
}

// doCmdAddの実装
func doCmdAdd(args *skel.CmdArgs, n *NetConf, fenv *subnetEnv) error {
// テストだとcni-flannelなどを渡している
    n.Delegate["name"] = n.Name

// キーが存在しない場合はデフォルトでブリッジプラグインを選択
    if !hasKey(n.Delegate, "type") {
        n.Delegate["type"] = "bridge"
    }

// 
    if !hasKey(n.Delegate, "ipMasq") {
        // if flannel is not doing ipmasq, we should
        // subnetEnv構造体のipmasqを取得し(flannelの設定のデフォルトではTrueっぽい)、反転させて代入
        ipmasq := !*fenv.ipmasq
        n.Delegate["ipMasq"] = ipmasq
    }

// subnetEnv構造体の値を参照して代入
    if !hasKey(n.Delegate, "mtu") {
        mtu := fenv.mtu
        n.Delegate["mtu"] = mtu
    }

// ブリッジタイプが指定されている場合はisGatewayをtrueにする
    if n.Delegate["type"].(string) == "bridge" {
        if !hasKey(n.Delegate, "isGateway") {
            n.Delegate["isGateway"] = true
        }
    }

// CNIVersionが0でなければDelegateにも同様に設定
    if n.CNIVersion != "" {
        n.Delegate["cniVersion"] = n.CNIVersion
    }

// netconf構造体にipamの値が存在すれば入力の値を使用し、置換もしくは補完する
    ipam, err := getDelegateIPAM(n, fenv)
    if err != nil {
        return fmt.Errorf("failed to assemble Delegate IPAM: %w", err)
    }
    n.Delegate["ipam"] = ipam
    fmt.Fprintf(os.Stderr, "\n%#v\n", n.Delegate)

    return delegateAdd(args.ContainerID, n.DataDir, n.Delegate)
}

func delegateAdd(cid, dataDir string, netconf map[string]interface{}) error {
// netconfのマーシャル
    netconfBytes, err := json.Marshal(netconf)
    fmt.Fprintf(os.Stderr, "delegateAdd: netconf sent to delegate plugin:\n")
    os.Stderr.Write(netconfBytes)
    if err != nil {
        return fmt.Errorf("error serializing delegate netconf: %v", err)
    }

// cmdDel用に一時NetConfの保存
    // save the rendered netconf for cmdDel
    if err = saveScratchNetConf(cid, dataDir, netconfBytes); err != nil {
        return err
    }

// 指定されたプラグインでADDを実行する
    result, err := invoke.DelegateAdd(context.TODO(), netconf["type"].(string), netconfBytes, nil)
    if err != nil {
        err = fmt.Errorf("failed to delegate add: %w", err)
        return err
    }
    return result.Print()
}

cmdDel()

ADDと同じ引数を渡し、コンテナを削除する。
最後にdelegateDelを呼び出してブリッジインターフェースを削除している。
doCmdAdd()でsaveScratchNetConf()を呼び、一時的に保存したデータを削除するところまでがワンセット。

func cmdDel(args *skel.CmdArgs) error {
// 標準入出力から値を読み込みパースする
    nc, err := loadFlannelNetConf(args.StdinData)
    if err != nil {
        return err
    }

// runtimeConfigをロードする
    if nc.RuntimeConfig != nil {
// nc.Delegateの値を代入する必要があるのでmakeし領域を確保
        if nc.Delegate == nil {
            nc.Delegate = make(map[string]interface{})
        }
        nc.Delegate["runtimeConfig"] = nc.RuntimeConfig
    }

    return doCmdDel(args, nc)
}

// saveScratchNetConf で保存したファイルをロードして利用
func consumeScratchNetConf(containerID, dataDir string) (func(error), []byte, error) {
    path := filepath.Join(dataDir, containerID)

    // cleanup will do clean job when no error happens in consuming/using process
    cleanup := func(err error) {
        if err == nil {
            // Ignore errors when removing - Per spec safe to continue during DEL
            _ = os.Remove(path)
        }
    }
    netConfBytes, err := os.ReadFile(path)

    return cleanup, netConfBytes, err
}

func doCmdDel(args *skel.CmdArgs, n *NetConf) error {
    cleanup, netConfBytes, err := consumeScratchNetConf(args.ContainerID, n.DataDir)
    if err != nil {
        if os.IsNotExist(err) {
            // Per spec should ignore error if resources are missing / already removed
            return nil
        }
        return err
    }

// deferなので最後にerrの発生がなければクリーンアップ(保存されているファイルの削除)が実行される
    // cleanup will work when no error happens
    defer func() {
        cleanup(err)
    }()

// 保存されているファイルを読み込み
    nc := &types.NetConf{}
    if err = json.Unmarshal(netConfBytes, nc); err != nil {
        // Interface will remain in the bridge but will be removed when rebooting the node
        fmt.Fprintf(os.Stderr, "failed to parse netconf: %v", err)
        return nil
    }

// 
    return invoke.DelegateDel(context.TODO(), nc.Type, netConfBytes, nil)
}

cmdCheck()

func cmdCheck(args *skel.CmdArgs) error {
    // TODO: implement
    return nil
}

まとめてきなやつ

あまりにもわからないのでとりあえず他のCNIプラグイン調べるか…と思い、flannel-io/cni-plugin を読んで正解だった。かなり理解が進んだ。
containernetworkingのパッケージにあるinvokeをなぜ呼び出しているのか?について調べたところ、どうやらインターフェースの作成をしてくれるものだったらしい。

www.cni.dev

flannelは前段で設計ファイルのパース→バリデーションを行い、実行時に問題のない形式にしている、ということが理解できた。
参考にしつつ、小さいCNIプラグインをまずは作ってみようと思う。

余談だが、k8sが出た当初、ネットワークはここまで自由ではなかったらしい。

終わりに

k8sは軽く勉強はしたが、実際にコアな部分(といっていいかはわからないが)に近いところに触れることができてとても面白い。
k8sのネットワークはどうやらプラガブルにごちょごちょできる、ということを知っていきなり触り始めたので、だいぶわからないところが多くてとても良い。
ただ、CNIプラグインを自作してる人がなかなかおらず、ブログもほとんど見つからなかった。クラスターネットワークの自作は…と思ったけど流石にあまりいなさそうな気もする。
ともあれ、仕様書、OSSになっている各種CNIプラグインソースコードといった、ナレッジとドキュメントがある。
読めばどうとでもなるので、脳筋でいけそう!ということがわかった。ひとまず続けてみる。