8000 feat: add resources to logs collector daemonset for k8s api server access by tedim52 · Pull Request #2663 · kurtosis-tech/kurtosis · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: add resources to logs collector daemonset for k8s api server access #2663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 12, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@ import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/object_attributes_provider/docker_labels_for_logs"
)

// These are the list of container's labels that the Docker's logging driver (currently, the Fluentd logging driver)
// will add into the logs stream when it sends them to the destination (currently, Fluentbit logs collector)
type LogsCollectorLabels []string
// GetServiceLabelsForLogsTracking returns list of labels to add to kurtosis user service containers
// Docker's logging driver (currently, the Fluentd logging driver) will add them into the logs stream forwarded by the logs collector
func GetLabelsForLogsTrackingLogsOfUserServiceContainers() []string {
var labels []string

func GetKurtosisTrackedLogsCollectorLabels() LogsCollectorLabels {
dockerLabelsForLogsStream := docker_labels_for_logs.GetDockerLabelsForLogStream()

var logsCollectorLabels LogsCollectorLabels

allLogsDatabaseKurtosisTrackedDockerLabelsSet := docker_labels_for_logs.GetAllLogsDatabaseKurtosisTrackedDockerLabels()

for _, dockerLabelKey := range allLogsDatabaseKurtosisTrackedDockerLabelsSet {
for _, dockerLabelKey := range dockerLabelsForLogsStream {
logsCollectorLabel := dockerLabelKey.GetString()
logsCollectorLabels = append(logsCollectorLabels, logsCollectorLabel)
labels = append(labels, logsCollectorLabel)
}

return logsCollectorLabels
return labels
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func StartRegisteredUserServices(
}

// The following docker labels will be added into the logs stream which is necessary for filtering, retrieving persisted logs
logsCollectorLabels := logs_collector_functions.GetKurtosisTrackedLogsCollectorLabels()
logsCollectorLabels := logs_collector_functions.GetLabelsForLogsTrackingLogsOfUserServiceContainers()

successfulStarts, failedStarts, err := runStartServiceOperationsInParallel(
ctx,
Expand Down Expand Up @@ -466,7 +466,7 @@ func runStartServiceOperationsInParallel(
dockerManager *docker_manager.DockerManager,
restartPolicy docker_manager.RestartPolicy,
logsCollectorAddress string,
logsCollectorLabels logs_collector_functions.LogsCollectorLabels,
logsCollectorLabels []string,
) (
map[service.ServiceUUID]*service.Service,
map[service.ServiceUUID]error,
Expand Down Expand Up @@ -529,7 +529,7 @@ func createStartServiceOperation(
dockerManager *docker_manager.DockerManager,
restartPolicy docker_manager.RestartPolicy,
logsCollectorAddress string,
logsCollectorLabels logs_collector_functions.LogsCollectorLabels,
logsCollectorLabels []string,
) operation_parallelizer.Operation {
id := serviceRegistration.GetName()
privateIpAddr := serviceRegistration.GetPrivateIP()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (

// The following docker labels will be added into the logs stream
// These are necessary for propagating information for log filtering and retrieval through the logging pipeline
var LogsDatabaseKurtosisTrackedDockerLabelsForIdentifyLogsStream = []*docker_label_key.DockerLabelKey{
var DockerLabelsForLogsStream = []*docker_label_key.DockerLabelKey{
docker_label_key.ContainerTypeDockerLabelKey,
docker_label_key.LogsEnclaveUUIDDockerLabelKey,
docker_label_key.LogsServiceUUIDDockerLabelKey,
docker_label_key.LogsServiceShortUUIDDockerLabelKey,
docker_label_key.LogsServiceNameDockerLabelKey,
}

// These are all the logs database Kurtosis tracked Docker Labels used
func GetAllLogsDatabaseKurtosisTrackedDockerLabels() []*docker_label_key.DockerLabelKey {
return LogsDatabaseKurtosisTrackedDockerLabelsForIdentifyLogsStream
func GetDockerLabelsForLogStream() []*docker_label_key.DockerLabelKey {
return DockerLabelsForLogsStream
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func CreateLogsCollector(
logrus.Debug("Found existing logs collector daemon set.")
} else {
logrus.Debug("Did not find existing log collector, creating one...")
daemonSet, configMap, namespace, removeLogsCollectorFunc, err := logsCollectorDaemonSet.CreateAndStart(
daemonSet, configMap, namespace, serviceAccount, clusterRole, clusterRoleBinding, removeLogsCollectorFunc, err := logsCollectorDaemonSet.CreateAndStart(
ctx,
"", // TODO: fill these in when adding aggregator to k8s
0, // TODO: fill these in when adding aggregator to k8s
Expand Down Expand Up @@ -77,9 +77,12 @@ func CreateLogsCollector(
}()

kubernetesResources = &logsCollectorKubernetesResources{
daemonSet: daemonSet,
configMap: configMap,
namespace: namespace,
daemonSet: daemonSet,
configMap: configMap,
serviceAccount: serviceAccount,
clusterRoleBinding: clusterRoleBinding,
clusterRole: clusterRole,
namespace: namespace,
}

logsCollectorObj, err = getLogsCollectorsObjectFromKubernetesResources(ctx, kubernetesManager, kubernetesResources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logs_collector_functions

import (
"context"
"errors"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_manager"
"github.com/kurtosis-tech/stacktrace"
"github.com/sirupsen/logrus"
Expand All @@ -24,22 +25,49 @@ func DestroyLogsCollector(ctx context.Context, kubernetesManager *kubernetes_man
return nil
}

var destroyErr error
var destroyErrs []error
if logsCollectorResources.daemonSet != nil {
if err := kubernetesManager.RemoveDaemonSet(ctx, logsCollectorNamespace.Name, logsCollectorResources.daemonSet); err != nil {
destroyErr = stacktrace.Propagate(err, "An error occurred removing logs collector daemon set.")
destroyErrs = append(destroyErrs, stacktrace.Propagate(err, "An error occurred removing logs collector daemon set."))
}
}

if logsCollectorResources.configMap != nil {
if err := kubernetesManager.RemoveConfigMap(ctx, logsCollectorNamespace.Name, logsCollectorResources.configMap); err != nil {
destroyErr = stacktrace.Propagate(err, "An error occurred removing logs collector config map.")
destroyErrs = append(destroyErrs, stacktrace.Propagate(err, "An error occurred removing logs collector config map."))
}
}

if logsCollectorResources.serviceAccount != nil {
if err := kubernetesManager.RemoveServiceAccount(ctx, logsCollectorResources.serviceAccount); err != nil {
destroyErrs = append(destroyErrs, stacktrace.Propagate(err, "An error occurred removing logs collector service account."))
}
}

if logsCollectorResources.clusterRole != nil {
if err := kubernetesManager.RemoveClusterRole(ctx, logsCollectorResources.clusterRole); err != nil {
destroyErrs = append(destroyErrs, stacktrace.Propagate(err, "An error occurred removing logs collector cluster role."))
}
}

if logsCollectorResources.clusterRoleBinding != nil {
if err := kubernetesManager.RemoveClusterRoleBindings(ctx, logsCollectorResources.clusterRoleBinding); err != nil {
destroyErrs = append(destroyErrs, stacktrace.Propagate(err, "An error occurred removing logs collector cluster role binding."))
}
}

if err := kubernetesManager.RemoveNamespace(ctx, logsCollectorNamespace); err != nil {
destroyErr = stacktrace.Propagate(err, "An error occurred removing logs collector namespace.")
destroyErrs = append(destroyErrs, stacktrace.Propagate(err, "An error occurred removing logs collector namespace."))
}

if err := waitForNamespaceRemoval(ctx, logsCollectorNamespace.Name, kubernetesManager); err != nil {
destroyErrs = append(destroyErrs, stacktrace.Propagate(err, "An error occurred waiting for logs collector namespace to be removed."))
}

errMsg := "Following errors occurred trying to destroy logs collector:\n"
for _, destroyErr := range destroyErrs {
errMsg += destroyErr.Error() + "\n"
}

return destroyErr
return errors.New(errMsg)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ const (
fluentBitCheckpointDbVolumeName = "fluent-bit-db"
fluentBitCheckpointDbMountPath = "/var/log/fluent-bit/db"

// assuming this as default k8s api server url - this might not be the case for very custom k8s environments so making this a variable
// in case it needs to be configured by the user down the line
k8sApiServerUrl = "https://kubernetes.default.svc:443"

// TODO: construct fluentbit config via go templating based on inputs
fluentBitConfigFileName = "fluent-bit.conf"
fluentBitConfigStr = `
fluentBitConfigFmtStr = `
[SERVICE]
HTTP_Server On
HTTP_Listen 0.0.0.0
Expand All @@ -47,6 +51,15 @@ const (
DB.sync normal
Read_from_Head true

[FILTER]
Name kubernetes
Match *
Kube_URL %v
Merge_log On
Keep_Log On
Annotations Off
Labels On

[OUTPUT]
Name stdout
Match *
Expand All @@ -58,14 +71,5 @@ const (
Path /var/log/fluent-bit
File fluent-bit-output.log
Format plain

[FILTER]
Name kubernetes
Match kurtosis.*
Merge_Log On
Merge_Log_Key On
Labels On
Annotations On
Kube_Tag_Prefix kurtosis.var.log.containers.
`
)
Loading
Loading
0