Kubernetes Management with Rust - A Dive into Generic Client-Go, Controller Abstractions, and CRD Macros with Kube.rs

Kubernetes Management with Rust - A Dive into Generic Client-Go, Controller Abstractions, and CRD Macros with Kube.rs

If you've read our previous blog on performing CRUD operations on Kubernetes using Golang, you should now be familiar with the basics of client-go. In this post, we will explore a more efficient way to use Rust with Kubernetes.

A Rust client for Kubernetes, found at kube-rs/kube, is designed similarly to the more general client-go. It incorporates a runtime abstraction modeled after controller-runtime and includes a derive macro for Custom Resource Definitions (CRDs) inspired by Kubebuilder. This project is hosted by the Cloud Native Computing Foundation (CNCF) as a Sandbox Project.

These crates extend Kubernetes' API machinery and API principles to support generic abstractions. These abstractions make it easier to develop Rust-based applications by enabling the use of reflectors, controllers, and custom resource interfaces.

Prerequisites

Before we dive into using Rust with Kubernetes, make sure you have the following set up:

  • Rust installed on your machine.

  • minikube or an alternative Kubernetes distribution installed.

  • kubectl installed.

Part 1 - List all Pods in the Cluster

In this section, we'll learn the basics of using the kubernetes rust client, by listing down all running pods in our cluster.

Step 1 - Initialize the project

For this, let's use Cargo to initiate the project:

cargo init kubers-demo

You can view the default project structure using the following command:

tree .

Output:

.
├── Cargo.toml
└── src
    └── main.rs

2 directories, 2 files

Step 2 - Update cargo package manager dependencies

Update the list of dependencies in the default kubers-demo/Cargo.toml file:

[package]
name = "kubers-demo"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
kube = { version = "0.87.2", features = ["client","runtime", "derive", "rustls-tls"] }
k8s-openapi = { version = "0.20.0", features = ["latest"] }
tokio = { version = "1.0", features = ["full"] }  # Use the latest version
[dev-dependencies]
k8s-openapi = { version = "0.20.0", features = ["latest"] }
async-std = "1.0"  # Use the latest version

Step 3 - Update main.rs

Update kubers-demo/src/main.rs, which contains our first simple program demonstrating how to interact with the Kubernetes API to list all pods within the default namespace.

use kube::{Client, Api};
use kube::api::ListParams;
use k8s_openapi::api::core::v1::Pod;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Load the kubeconfig file.
    let config = kube::Config::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await?;
    let client = Client::try_from(config)?;

    // Work with Kubernetes API.
    let pods: Api<Pod> = Api::default_namespaced(client);
    let lp = ListParams::default();

    for p in pods.list(&lp).await? {
        println!("Found Pod: {}", p.metadata.name.unwrap_or_default());
    }

    Ok(())
}

Code Explanation:

Let's break down the above code to understand how our first simple program demonstrates interacting with the Kubernetes API to list all pods within the default namespace. Here's a step-by-step explanation:

  1. Imports: The necessary crates and modules are imported:

    • kube::{Client, Api}: Provides the main Kubernetes client and the API wrapper.

    • kube::api::ListParams: Allows parameterizing list queries.

    • k8s_openapi::api::core::v1::Pod: Defines the Pod resource from the Kubernetes core API.

    • std::error::Error: Handles error types.

  2. Main Function: The main function is declared with the #[tokio::main] attribute to enable asynchronous execution using the Tokio runtime.

     #[tokio::main]
     async fn main() -> Result<(), Box<dyn Error>> {
    
  1. Load Kubeconfig: The Kubernetes configuration is loaded from the default kubeconfig file (typically found at ~/.kube/config).

     let config = kube::Config::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await?;
    
  1. Create Client: A Kubernetes client is created from the loaded configuration.

     let client = Client::try_from(config)?;
    
  1. Define API for Pods: An Api object for Pod resources is created, scoped to the default namespace using the client.

     let pods: Api<Pod> = Api::default_namespaced(client);
    
  1. List Parameters: Default parameters for listing resources are set up using ListParams.

     let lp = ListParams::default();
    
  1. List Pods:The code lists the Pods in the default namespace using the list method on the pods API object. It iterates over the list of Pods and prints their names.

     for p in pods.list(&lp).await? {
         println!("Found Pod: {}", p.metadata.name.unwrap_or_default());
     }
    
  1. Return: The function returns Ok(()) to signify successful execution.

     Ok(())
     }
    

Initializes a connection to a Kubernetes cluster using the local kubeconfig file, then lists all pods in the default namespace and prints their names. The use of async/await with the Tokio runtime allows for efficient, non-blocking interaction with the Kubernetes API.

Step 4 - Deploy a sample nginx application

kubectl apply -f https://k8s.io/examples/application/deployment.yaml

Step 5 - Test the program

Use the following command to list down all the current pods in our Kubernetes cluster:

cargo run -- --kubers-demo kubectl -- get po

You'll get the following output:


Finished `dev` profile [unoptimized + debuginfo] target(s) in 3.77s
Running `target/debug/k8s-client --kubers-demo kubectl -- get po`
Found Pod: demo-nginx
Found Pod: crud-4vj57
Found Pod: nginx-deployment-86dcfdf4c6-cnlp7
Found Pod: nginx-deployment-86dcfdf4c6-vskvc
...

Part 2 - Building a Custom Resource Definition (CRD)

Kubernetes Custom Resource Definitions (CRDs) let you extend the Kubernetes API by defining your own custom objects. These objects can represent any kind of resource specific to your applications or workflows, such as database configurations, networking policies, or even complex deployment patterns. CRDs provide a way to manage these custom resources using the familiar Kubernetes tools and APIs, bringing consistency and automation to your Kubernetes environment.

In this section, we'll use the knowledge from the previous section to build a custom resource definition and then a custom resource for our Kubernetes cluster, using the rust client.

Step 1 - Initialize a new project

Let us initialize a new project for this part. Use the following command to do so:

cargo init kube-crd

Step 2 - Create custom resource definition schema

Add the following code in src/main.rs:

use kube::{Api, Client};
use kube::api::PostParams;
use serde::{Serialize, Deserialize};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::{
    CustomResourceDefinition,
    CustomResourceDefinitionSpec,
    CustomResourceDefinitionNames,
    CustomResourceDefinitionVersion,
    JSONSchemaProps,
    CustomResourceValidation,
    JSONSchemaPropsOrArray,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use schemars::JsonSchema;
use kube::CustomResource;
use std::collections::BTreeMap;

// Define the spec of our custom resource
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(group = "example.com", version = "v1", kind = "KCDTrack2", namespaced)]
pub struct MeetupSpec {
    organizer: String,
    topic: String,
    attendees: Vec<String>,
    conference: String,
    time: String,
    session_type: String,
    speaker: String,
}

// Main function to create the CRD in the cluster
#[tokio::main]
async fn main() -> Result<(), kube::Error> {
    let client = Client::try_default().await?;

    let crds: Api<CustomResourceDefinition> = Api::all(client);
    let pp = PostParams::default();

    // Define the CRD for our KCDTrack2 resource
    let kcd_crd = CustomResourceDefinition {
        metadata: ObjectMeta {
            name: Some("kcdtrack2s.example.com".to_string()),
            ..Default::default()
        },
        spec: CustomResourceDefinitionSpec {
            group: "example.com".to_string(),
            versions: vec![
                CustomResourceDefinitionVersion {
                    name: "v1".to_string(),
                    served: true,
                    storage: true,
                    schema: Some(CustomResourceValidation {
                        open_api_v3_schema: Some(JSONSchemaProps {
                            type_: Some("object".to_string()),
                            properties: Some({
                                let mut props = BTreeMap::new();
                                props.insert("spec".to_string(), JSONSchemaProps {
                                    type_: Some("object".to_string()),
                                    properties: Some({
                                        let mut spec_props = BTreeMap::new();
                                        spec_props.insert("organizer".to_string(), JSONSchemaProps {
                                            type_: Some("string".to_string()),
                                            ..Default::default()
                                        });
                                        spec_props.insert("topic".to_string(), JSONSchemaProps {
                                            type_: Some("string".to_string()),
                                            ..Default::default()
                                        });
                                        spec_props.insert("attendees".to_string(), JSONSchemaProps {
                                            type_: Some("array".to_string()),
                                            items: Some(JSONSchemaPropsOrArray::Schema(Box::new(JSONSchemaProps {
                                                type_: Some("string".to_string()),
                                                ..Default::default()
                                            }))),
                                            ..Default::default()
                                        });
                                        spec_props.insert("conference".to_string(), JSONSchemaProps {
                                            type_: Some("string".to_string()),
                                            ..Default::default()
                                        });
                                        spec_props.insert("time".to_string(), JSONSchemaProps {
                                            type_: Some("string".to_string()),
                                            ..Default::default()
                                        });
                                        spec_props.insert("session_type".to_string(), JSONSchemaProps {
                                            type_: Some("string".to_string()),
                                            ..Default::default()
                                        });
                                        spec_props.insert("speaker".to_string(), JSONSchemaProps {
                                            type_: Some("string".to_string()),
                                            ..Default::default()
                                        });
                                        spec_props
                                    }),
                                    ..Default::default()
                                });
                                props
                            }),
                            ..Default::default()
                        }),
                    }),
                    ..Default::default()
                }
            ],
            names: CustomResourceDefinitionNames {
                plural: "kcdtrack2s".to_string(),
                singular: Some("kcdtrack2".to_string()),
                kind: "KCDTrack2".to_string(),
                short_names: Some(vec!["kcdt2".to_string()]),
                ..Default::default()
            },
            scope: "Namespaced".to_string(),
            ..Default::default()
        },
        status: None,
    };

    // Create the CRD
    crds.create(&pp, &kcd_crd).await?;

    Ok(())
}

Step 3 - Update cargo package manager dependencies

Add the following code in kube-crd/Cargo.toml file:

[package]
name = "kube-crd"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
schemars = "0.8.15"
kube = { version = "0.87.1", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.20.0", features = ["latest"] }
tokio = { version = "1.0", features = ["full"] }  # Use the latest version
serde = "1.0.155"
serde_derive = "1.0.155"
serde_json = "1.0.94"
serde_yaml = "0.9.19"

Step 4 - Run the program

cargo run

Output:

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.07s
Running `target/debug/kube-crd`

Step 5 - Verify CRD creation

kubectl get crd

If everything goes well, you will get the following output:

NAME                     CREATED AT
kcdtrack2s.example.com   2024-06-23T04:46:25Z

Step 6 - Create a new custom resource

Create a new k8s manifest file using vi kube-crd/kcdhyd.yaml and add the following configuration to create a custom resource, based on our CRD:

apiVersion: example.com/v1
kind: KCDTrack2
metadata:
  name: integrating-rust
spec:
  organizer: "kcdhyd"
  topic: "Building the Bridge: Integrating Rust with Kubernetes Controller Runtime"
  attendees: []
  conference: "KCD Hyderabad"
  time: "15:55 - 16:20"
  session_type: "Session"
  speaker: "Sangam Biradar, CloudNativeFolks"

Step 7 - Apply the CRD yaml

kubectl apply -f kcdhyd.yaml

Output:

kcdtrack2.example.com/integrating-rust created

Step 8 - Print a specific jsonpath of CRD

kubectl get kcdtrack2 integrating-rust -o jsonpath='{.spec.speaker}'

Output:

Sangam Biradar, CloudNativeFolks

Part 3 - Monitor Kubernetes pods and send updates to the slack channel

In this section, we'll use what we've learned from the above sections, to create a program that watches Kubernetes pod events using the kube runtime, filter for changes, and trigger Slack notifications when pod statuses change.

Step 1 - Initialize a new project

Let us initialize a new project called slack-integration:

cargo init slack-integration

Step 2 - Update cargo package manager dependencies

Update the default slack-integration/Cargo.toml with the following configuration:

[package]
name = "slack-integration"
version = "0.1.0"
edition = "2021"

[dependencies]
kube = { version = "0.87.2", features = ["runtime"] }
kube-runtime = "0.87.2"
k8s-openapi = { version = "0.20.0", features = ["latest"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11.23", features = ["json"] }  # Enable the json feature
schemars = "0.8"
futures-util = "0.3.30"

Step 3 - Create the program

Let us first import all the necessary libraries to manage Kubernetes resources, make HTTP requests, handle JSON data, and support asynchronous programming.

Add the following libraries in src/main.rs:

use kube::{Client, Api};
use kube::runtime::watcher;
use k8s_openapi::api::core::v1::Pod;
use tokio;
use reqwest;
use serde_json::json;
use futures_util::TryStreamExt;
use tokio::sync::mpsc;

Next up, we'll create a new function called send_slack_message, which would be responsible for sending messages to Slack.

async fn send_slack_message(client: &reqwest::Client, webhook_url: &str, messages: Vec<String>) {
    let payload = json!({ "text": messages.join("\n") });
    if let Err(e) = client.post(webhook_url)
        .json(&payload)
        .send()
        .await {
        eprintln!("Failed to send message to Slack: {}", e);
    }
}

Let us now create the main function for our program:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::try_default().await?;
    let pods: Api<Pod> = Api::all(client);
    let watcher = watcher(pods, Default::default());

    let slack_webhook_url = "https://hooks.slack.com/services/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"; // Replace with your Slack webhook URL
    let reqwest_client = reqwest::Client::new();

    let (tx, mut rx) = mpsc::channel(100);

    // Task to batch and send Slack messages periodically
    let webhook_task = tokio::spawn(async move {
        let mut messages = Vec::new();
        while let Some(message) = rx.recv().await {
            messages.push(message);
            if messages.len() >= 10 {
                send_slack_message(&reqwest_client, slack_webhook_url, messages.split_off(0)).await;
            }
        }
        if !messages.is_empty() {
            send_slack_message(&reqwest_client, slack_webhook_url, messages).await;
        }
    });

    tokio::pin!(watcher);
    while let Some(event) = watcher.try_next().await? {
        if let watcher::Event::Applied(pod) = event {
            let pod_name = pod.metadata.name.unwrap_or_default();
            let message = format!("Pod update: {}", pod_name);
            tx.send(message).await?;
        }
    }

    drop(tx); // Close the channel to stop the webhook task
    webhook_task.await?;

    Ok(())
}

Here's a breakdown of the core logic:

  • Tokio Main Function:

    • Uses #[tokio::main] to enable asynchronous execution.
  • Kubernetes Client and Watcher:

    • Initializes a Kubernetes client with Client::try_default().await?.

    • Sets up a watcher for all Pods with Api::all(client) and watcher(pods, Default::default()).

  • Slack Webhook Setup:

    • Defines the Slack webhook URL to send notifications.

    • Creates an HTTP client using reqwest::Client::new().

  • Message Channel:

    • Creates a channel for message passing with mpsc::channel(100).
  • Batching and Sending Slack Messages:

    • Spawns a task to batch and send messages asynchronously.

    • Collects messages and sends them in batches to Slack.

  • Processing Pod Events:

    • Watches for pod events and extracts pod names.

    • Sends formatted messages to the channel when a pod event occurs.

  • Cleanup:

    • Closes the channel to stop the message-sending task.

    • Waits for the task to complete before exiting the program.

Step 4 - Slack integration

Create a new Slack app by heading over to Slack Apps and enable the "app incoming webhooks", as shown below. Additionally, copy webhook URL to add in our program.

After this, make sure to paste the Slack webhook URL in your code (in slack-integration/src/main.rs):

let slack_webhook_url = "Slack webhook URL here"; // Replace with your Slack webhook URL

Step 5 - Run the program and deploy the application

cargo run

Output:

Compiling slack-integration v0.1.0 (/Users/sangambiradar/Documents/GitHub/kubers-demo/slack-integration)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 2.24s
Running `/Users/sangambiradar/Documents/GitHub/kubers-demo/target/debug/slack-integration`
...

In another terminal, deploy any application using a sample YAML:

kubectl apply -f nginx2.yaml

You will see real-time updates on the Slack channel 🎉

Conclusion

We explored how to efficiently use Rust with Kubernetes by leveraging the kube-rs crate. We started with setting up the Rust environment, Minikube, and initializing a Rust project using Cargo. We demonstrated how to interact with the Kubernetes API to list all pods within the default namespace. Additionally, we covered creating a Custom Resource Definition (CRD) in Rust, including defining custom resources, building CRDs, and verifying their creation in Kubernetes.

Furthermore, we built a simple Rust application that monitors Kubernetes pods and sends updates to a Slack channel using a webhook. This application demonstrated asynchronous programming with Rust's Tokio runtime and efficient communication with Slack through batched messages.

Resources

Follow Kubesimplify on Hashnode, Twitter/X and LinkedIn. Join our Discord server to learn with us!

Did you find this article valuable?

Support Kubesimplify by becoming a sponsor. Any amount is appreciated!