Skip to content
Merged
22 changes: 16 additions & 6 deletions block/internal/da/forced_inclusion_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ import (
// ErrForceInclusionNotConfigured is returned when the forced inclusion namespace is not configured.
var ErrForceInclusionNotConfigured = errors.New("forced inclusion namespace not configured")

// ForcedInclusionRetriever handles retrieval of forced inclusion transactions from DA.
type ForcedInclusionRetriever struct {
// ForcedInclusionRetriever defines the interface for retrieving forced inclusion transactions from DA.
type ForcedInclusionRetriever interface {
RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error)
Stop()
}

// forcedInclusionRetriever handles retrieval of forced inclusion transactions from DA.
type forcedInclusionRetriever struct {
client Client
logger zerolog.Logger
daEpochSize uint64
Expand All @@ -40,7 +46,7 @@ func NewForcedInclusionRetriever(
logger zerolog.Logger,
cfg config.Config,
daStartHeight, daEpochSize uint64,
) *ForcedInclusionRetriever {
) ForcedInclusionRetriever {
retrieverLogger := logger.With().Str("component", "forced_inclusion_retriever").Logger()

// Create async block retriever for background prefetching
Expand All @@ -54,24 +60,28 @@ func NewForcedInclusionRetriever(
)
asyncFetcher.Start()

return &ForcedInclusionRetriever{
base := &forcedInclusionRetriever{
client: client,
logger: retrieverLogger,
daStartHeight: daStartHeight,
daEpochSize: daEpochSize,
asyncFetcher: asyncFetcher,
}
if cfg.Instrumentation.IsTracingEnabled() {
return withTracingForcedInclusionRetriever(base)
}
return base
}

// Stop stops the background prefetcher.
func (r *ForcedInclusionRetriever) Stop() {
func (r *forcedInclusionRetriever) Stop() {
r.asyncFetcher.Stop()
}

// RetrieveForcedIncludedTxs retrieves forced inclusion transactions at the given DA height.
// It respects epoch boundaries and only fetches at epoch end.
// It tries to get blocks from the async fetcher cache first, then falls back to sync fetching.
func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) {
func (r *forcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) {
// when daStartHeight is not set or no namespace is configured, we retrieve nothing.
if !r.client.HasForcedInclusionNamespace() {
return nil, ErrForceInclusionNotConfigured
Expand Down
54 changes: 54 additions & 0 deletions block/internal/da/forced_inclusion_tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package da

import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

var _ ForcedInclusionRetriever = (*tracedForcedInclusionRetriever)(nil)

type tracedForcedInclusionRetriever struct {
inner ForcedInclusionRetriever
tracer trace.Tracer
}

func withTracingForcedInclusionRetriever(inner ForcedInclusionRetriever) ForcedInclusionRetriever {
return &tracedForcedInclusionRetriever{
inner: inner,
tracer: otel.Tracer("ev-node/forced-inclusion"),
}
}

func (t *tracedForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) {
ctx, span := t.tracer.Start(ctx, "ForcedInclusionRetriever.RetrieveForcedIncludedTxs",
trace.WithAttributes(
attribute.Int64("da.height", int64(daHeight)),
),
)
defer span.End()

event, err := t.inner.RetrieveForcedIncludedTxs(ctx, daHeight)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return event, err
}

if event != nil {
span.SetAttributes(
attribute.Int64("event.start_da_height", int64(event.StartDaHeight)),
attribute.Int64("event.end_da_height", int64(event.EndDaHeight)),
attribute.Int("event.tx_count", len(event.Txs)),
)
}

return event, nil
}

func (t *tracedForcedInclusionRetriever) Stop() {
t.inner.Stop()
}
2 changes: 1 addition & 1 deletion block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type Syncer struct {

// Handlers
daRetriever DARetriever
fiRetriever *da.ForcedInclusionRetriever
fiRetriever da.ForcedInclusionRetriever
p2pHandler p2pHandler

// Forced inclusion tracking
Expand Down
1 change: 1 addition & 0 deletions block/public.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ForcedInclusionRetriever interface {

// NewForcedInclusionRetriever creates a new forced inclusion retriever.
// It internally creates and manages an AsyncBlockRetriever for background prefetching.
// Tracing is automatically enabled when configured.
func NewForcedInclusionRetriever(
client DAClient,
cfg config.Config,
Expand Down
Loading