Implement OpenTelemetry tracing in the agent

- Added OpenTelemetry tracing support in main.go, including a new function startTracing to initialize the tracer with a configurable endpoint.
- Updated the environment attribute from "testing" to "develop" for better clarity in tracing.
- Integrated tracing into the RTSP connection process in gortsplib.go by creating a span for the Connect method.
- Enhanced the Bootstrap function in Kerberos.go to include tracing, marking the start and end of the bootstrap process.
- Introduced a new span in RunAgent to trace the execution flow and ensure proper span management.
This commit is contained in:
cedricve
2025-06-20 09:35:13 +00:00
parent c50137f255
commit bf35e5efb6
7 changed files with 1571 additions and 69 deletions

View File

@@ -25,4 +25,7 @@ AGENT_KERBEROSVAULT_SECONDARY_URI=
AGENT_KERBEROSVAULT_SECONDARY_PROVIDER=
AGENT_KERBEROSVAULT_SECONDARY_DIRECTORY=
AGENT_KERBEROSVAULT_SECONDARY_ACCESS_KEY=
AGENT_KERBEROSVAULT_SECONDARY_SECRET_KEY=
AGENT_KERBEROSVAULT_SECONDARY_SECRET_KEY=
# Open telemetry tracing endpoint
OTEL_EXPORTER_OTLP_ENDPOINT=

BIN
machinery/__debug_bin2429242235 Executable file

Binary file not shown.

View File

@@ -2,6 +2,8 @@ module github.com/kerberos-io/agent/machinery
go 1.24.2
replace google.golang.org/genproto => google.golang.org/genproto v0.0.0-20250519155744-55703ea1f237
require (
github.com/Eyevinn/mp4ff v0.48.0
github.com/InVisionApp/conjungo v1.1.0
@@ -35,6 +37,10 @@ require (
github.com/tevino/abool v1.2.0
github.com/zaf/g711 v1.4.0
go.mongodb.org/mongo-driver v1.17.4
go.opentelemetry.io/otel v1.36.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0
go.opentelemetry.io/otel/sdk v1.36.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
@@ -46,6 +52,7 @@ require (
github.com/bluenviron/mediacommon/v2 v2.2.0 // indirect
github.com/bytedance/sonic v1.13.2 // indirect
github.com/bytedance/sonic/loader v0.2.4 // indirect
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/clbanning/mxj/v2 v2.7.0 // indirect
github.com/cloudwego/base64x v0.1.5 // indirect
@@ -54,6 +61,8 @@ require (
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
github.com/gin-contrib/sse v1.0.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.6 // indirect
github.com/go-openapi/spec v0.20.4 // indirect
@@ -62,9 +71,9 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.26.0 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/golang/protobuf v1.5.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
github.com/icholy/digest v0.1.23 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
@@ -107,15 +116,21 @@ require (
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
github.com/ziutek/mymysql v1.5.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.opentelemetry.io/proto/otlp v1.6.0 // indirect
golang.org/x/arch v0.16.0 // indirect
golang.org/x/crypto v0.38.0 // indirect
golang.org/x/net v0.40.0 // indirect
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.14.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.25.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/appengine v1.6.6 // indirect
golang.org/x/tools v0.30.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237 // indirect
google.golang.org/grpc v1.72.1 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/ini.v1 v1.42.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect

File diff suppressed because it is too large Load Diff

View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"fmt"
"os"
"time"
@@ -11,6 +12,13 @@ import (
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/onvif"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
configService "github.com/kerberos-io/agent/machinery/src/config"
"github.com/kerberos-io/agent/machinery/src/routers"
@@ -19,8 +27,8 @@ import (
var VERSION = utils.VERSION
/*func startTracing() (*trace.TracerProvider, error) {
serviceName := "product-app"
func startTracing(otelEndpoint string) (*trace.TracerProvider, error) {
serviceName := "agent"
headers := map[string]string{
"content-type": "application/json",
}
@@ -28,7 +36,7 @@ var VERSION = utils.VERSION
exporter, err := otlptrace.New(
context.Background(),
otlptracehttp.NewClient(
otlptracehttp.WithEndpoint("localhost:4318"),
otlptracehttp.WithEndpoint(otelEndpoint),
otlptracehttp.WithHeaders(headers),
otlptracehttp.WithInsecure(),
),
@@ -48,7 +56,7 @@ var VERSION = utils.VERSION
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
attribute.String("environment", "testing"),
attribute.String("environment", "develop"),
),
),
)
@@ -56,7 +64,7 @@ var VERSION = utils.VERSION
otel.SetTracerProvider(tracerprovider)
return tracerprovider, nil
}*/
}
func main() {
@@ -91,16 +99,20 @@ func main() {
log.Log.Init(logLevel, logOutput, configDirectory, timezone)
// Start OpenTelemetry tracing
/*traceProvider, err := startTracing()
if err != nil {
log.Log.Error("traceprovider: " + err.Error())
}
defer func() {
if err := traceProvider.Shutdown(context.Background()); err != nil {
if otelEndpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); otelEndpoint == "" {
log.Log.Info("main.Main(): No OpenTelemetry endpoint provided, skipping tracing")
} else {
log.Log.Info("main.Main(): Starting OpenTelemetry tracing with endpoint: " + otelEndpoint)
traceProvider, err := startTracing(otelEndpoint)
if err != nil {
log.Log.Error("traceprovider: " + err.Error())
}
}()
_ = traceProvider.Tracer("my-app")*/
defer func() {
if err := traceProvider.Shutdown(context.Background()); err != nil {
log.Log.Error("traceprovider: " + err.Error())
}
}()
}
switch action {

View File

@@ -33,8 +33,11 @@ import (
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/packets"
"github.com/pion/rtp"
"go.opentelemetry.io/otel"
)
var tracer = otel.Tracer("github.com/kerberos-io/agent")
// Implements the RTSPClient interface.
type Golibrtsp struct {
RTSPClient
@@ -103,7 +106,10 @@ func init() {
}
// Connect to the RTSP server.
func (g *Golibrtsp) Connect(ctx context.Context) (err error) {
func (g *Golibrtsp) Connect(ctxRunAgent context.Context) (err error) {
_, span := tracer.Start(ctxRunAgent, "Connect")
defer span.End()
transport := gortsplib.TransportTCP
g.Client = gortsplib.Client{

View File

@@ -9,6 +9,7 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gin-gonic/gin"
"go.opentelemetry.io/otel"
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/cloud"
@@ -23,16 +24,13 @@ import (
"github.com/tevino/abool"
)
//var tracer = otel.Tracer("github.com/Salaton/tracing/pkg/usecases/product")
var tracer = otel.Tracer("github.com/kerberos-io/agent")
func Bootstrap(ctx context.Context, configDirectory string, configuration *models.Configuration, communication *models.Communication, captureDevice *capture.Capture) {
log.Log.Debug("components.Kerberos.Bootstrap(): bootstrapping the kerberos agent.")
/*
_, span := tracer.Start(ctx, "CreateProducBootstrap")
span.End()
*/
_, span := tracer.Start(ctx, "Bootstrap")
// We will keep track of the Kerberos Agent up time
// This is send to Kerberos Hub in a heartbeat.
@@ -86,6 +84,8 @@ func Bootstrap(ctx context.Context, configDirectory string, configuration *model
// Configure a MQTT client which helps for a bi-directional communication
mqttClient := routers.ConfigureMQTT(configDirectory, configuration, communication)
span.End()
// Run the agent and fire up all the other
// goroutines which do image capture, motion detection, onvif, etc.
for {
@@ -122,6 +122,9 @@ func Bootstrap(ctx context.Context, configDirectory string, configuration *model
func RunAgent(configDirectory string, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, uptimeStart time.Time, cameraSettings *models.Camera, captureDevice *capture.Capture) string {
ctxRunAgent := context.TODO()
_, span := tracer.Start(ctxRunAgent, "RunAgent")
log.Log.Info("components.Kerberos.RunAgent(): Creating camera and processing threads.")
config := configuration.Config
@@ -132,7 +135,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
rtspUrl := config.Capture.IPCamera.RTSP
rtspClient := captureDevice.SetMainClient(rtspUrl)
if rtspUrl != "" {
err := rtspClient.Connect(context.Background())
err := rtspClient.Connect(ctxRunAgent)
if err != nil {
log.Log.Error("components.Kerberos.RunAgent(): error connecting to RTSP stream: " + err.Error())
rtspClient.Close()
@@ -315,6 +318,9 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
// If we reach this point, we have a working RTSP connection.
communication.CameraConnected = true
// Otel end span
span.End()
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// This will go into a blocking state, once this channel is triggered
// the agent will cleanup and restart.