Skip to content

[CELEBORN-2349] Support worker supplied tags and TagsManager hardening#3719

Open
s0nskar wants to merge 5 commits into
apache:mainfrom
s0nskar:tags_improvements
Open

[CELEBORN-2349] Support worker supplied tags and TagsManager hardening#3719
s0nskar wants to merge 5 commits into
apache:mainfrom
s0nskar:tags_improvements

Conversation

@s0nskar

@s0nskar s0nskar commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR lets a worker advertise tags at startup and makes tags part of WorkerInfo, so they will use existing Raft replication and worker-lifecycle cleanup. This also gel well with current implementation, allowing tags from config service to be merged with worker supplied tags.

Backward Compatible:

  • The behaviour remains unchanged when no worker supplied tags are configured.
  • A master-side config celeborn.tags.worker.registration.enabled (default: true), will allow to ignore worker-supplied tags.
  • If worker is running on older version, worker supplied tags will be empty.

Why are the changes needed?

TagsManager today only works off the dynamic-config store. There is no way for a worker to declare its own tags. This makes this feature a little hard to use.

After this PR a worker can supply its own tags and manage its own tags lifecycle and still allow tags overriding via config service. This will make worker tags feature a lot easier to use.

Does this PR resolve a correctness bug?

  • Yes

Does this PR introduce any user-facing change?

  • Yes

How was this patch tested?

  • Added Unit Tests.
  • Working on end-to-end testing in local setup.

@s0nskar s0nskar changed the title [CELEBORN-2349] Support worker supplied tags during startup and TagsManager hardening [CELEBORN-2349] Support worker supplied tags and TagsManager hardening Jun 5, 2026
@s0nskar

s0nskar commented Jun 5, 2026

Copy link
Copy Markdown
Contributor Author

@SteNicholas can you please trigger Copilot magic on this.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends Celeborn’s worker tagging mechanism by allowing workers to advertise tags during registration, persisting those tags as part of WorkerInfo (thus flowing through existing HA/snapshot replication), and updating tag-based worker filtering to consider both config-store tags and worker-supplied tags.

Changes:

  • Add worker-supplied tags to the worker registration flow (worker → protobuf → master/meta managers) with a master-side toggle (celeborn.tags.worker.registration.enabled) to ignore them.
  • Persist tags in WorkerInfo and include them in protobuf (de)serialization for snapshots / RPC payloads.
  • Harden TagsManager logic and expand unit tests to cover config-store tags, self-registered tags, and mixed scenarios.

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala Sends configured worker tags during registration and stores them on local workerInfo.
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala Reads worker-supplied tags from registration request (optionally ignoring them by config) and passes them into meta registration.
master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala Updates tag filtering to consider both config-store tags and self-registered tags.
master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala Adds/updates unit tests for combined config-store + self-registered tag behavior.
common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala Adds tags field to WorkerInfo.
common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala Serializes/deserializes WorkerInfo.tags to/from protobuf.
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala Adds tags to the RegisterWorker protobuf builder.
master/src/main/proto/Resource.proto Adds tags to meta-side RegisterWorkerRequest.
common/src/main/proto/TransportMessages.proto Adds tags to PbWorkerInfo and worker registration messages.
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/*.java Plumbs tags through register-worker meta update paths (single-master + HA).
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala Introduces new config entries for worker tags and master toggle for honoring them.
docs/configuration/worker.md Documents celeborn.worker.tags.
docs/configuration/master.md Documents celeborn.tags.worker.registration.enabled.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +54 to +66
val tags = resolveTagsExpr(userIdentifier, clientTagsExpr)
.split(",").map(_.trim).filter(_.nonEmpty)

if (tagsExpr.isEmpty) {
logWarning("No tags provided")
if (tags.isEmpty) {
logDebug("No tags provided, returning all workers")
return workers
}

val tags = tagsExpr.split(",").map(_.trim)

var workersForTags: Option[JSet[String]] = None
tags.foreach { tag =>
val taggedWorkers = getTagStore.getOrDefault(tag, Collections.emptySet())
workersForTags match {
case Some(w) =>
w.retainAll(taggedWorkers)
case _ =>
workersForTags = Some(new util.HashSet[String](taggedWorkers))
}
}

if (workersForTags.isEmpty) {
logWarning(s"No workers for tags: $tagsExpr found in cluster")
return Collections.emptyList()
}

val workerTagsPredicate = new Predicate[WorkerInfo] {
override def test(w: WorkerInfo): Boolean = workersForTags.get.contains(w.toUniqueId)
override def test(w: WorkerInfo): Boolean = tags.forall { tag =>
w.tags.contains(tag) ||
tagStore.exists(_.getOrDefault(tag, Collections.emptySet()).contains(w.toUniqueId))
}

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well-designed, cleanly executed feature. I traced the proto/serde/Raft wiring, the config gate, and the TagsManager refactor — the backward-compat story holds and I found no correctness bugs, only minor cleanups and one pre-existing limitation worth documenting. Approve with a few follow-ups.

Things I verified as correct

  • WorkerInfo.equals/hashCode key only on host + 4 portstags is excluded, so adding it doesn't perturb any map/set keyed on WorkerInfo.
  • Proto changes are backward compatible — all new repeated string tags fields; absent on old peers → empty list. The worker→master RPC (PbRegisterWorker), the Raft log (Resource.proto RegisterWorkerRequest), and snapshot serde (PbWorkerInfo via PbSerDeUtils) are wired end to end.
  • Config gate is at a single choke point (Master.scala strips tags to Set.empty when disabled) and feeds all three registration sub-paths + the HA path.
  • Removed addTagToWorker/removeTagFromWorker/removeTagFromCluster/defaultTagStore have zero callers anywhere in the tree — they were effectively dead in production, which matches the PR's premise. Good cleanup.
  • Heartbeat does not clobber tagsupdateWorkerHeartbeatMeta mutates only disk/status fields on the existing WorkerInfo and never touches tags.
  • Defensive copies (new HashSet<>(tags)) on ingest; the new getTaggedWorkers predicate (AND across tags, self-OR-store per tag) matches the documented semantics.

Minor follow-ups (none blocking)

  1. PbMetaRegisterWorkerRequest edit is dead + has a field-number gap. That message isn't referenced anywhere in the codebase, and the new tags = 12 skips field numbers 10–11. I'd drop that proto edit; if you keep it, use 10 to avoid a confusing gap.

  2. Tag changes don't refresh on a re-register over a live entry. AbstractMetaManager.updateRegisterWorkerMeta uses workersMap.putIfAbsent(...), so a worker that re-registers while its entry still exists keeps the old WorkerInfo (old/empty tags). This is pre-existing behavior shared with disk info, not introduced here — but combined with this feature it means new celeborn.worker.tags only take effect after the stale entry is evicted (worker-lost), not on an immediate restart/re-register. Worth a doc note or follow-up.

  3. tagStore is a def re-evaluated per (worker × tag) inside the predicate. getTags() is memoized so it's cheap (and actually avoids the old code's full-map copy), but hoisting it to one val at the top of getTaggedWorkers would be clearer and drop the per-pair Option allocations. Style nit.

  4. Design note: the merge is union-only — the config store can add tags to a worker but can't remove a worker's self-declared tag; the only override is the cluster-wide tagsWorkerRegistrationEnabled=false. Fine for v1, worth calling out in case per-worker override is later expected.

Testing

The TagsManagerSuite rewrite is good and covers self-tags, config-store, and merged cases. Gap: nothing exercises the registration → Raft → WorkerInfo.tags path — the unit tests set w.tags directly, bypassing the proto/serde wiring that is most of this PR. A small meta-manager test (register a worker carrying tags, assert getTaggedWorkers sees them, and assert tagsWorkerRegistrationEnabled=false drops them) would lock that down. The PR already notes e2e is in progress.

🤖 Generated with Claude Code

@s0nskar

s0nskar commented Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

Tag changes don't refresh on a re-register over a live entry. AbstractMetaManager.updateRegisterWorkerMeta uses workersMap.putIfAbsent(...), so a worker that re-registers while its entry still exists keeps the old WorkerInfo (old/empty tags). This is pre-existing behavior shared with disk info, not introduced here — but combined with this feature it means new celeborn.worker.tags only take effect after the stale entry is evicted (worker-lost), not on an immediate restart/re-register. Worth a doc note or follow-up.

@SteNicholas What do you think about this? Should we update the tags in next heartbeat similar to disk infos. Or should we change the code slightly and update these values during re-registration itself.

I think it will be cleaner if replace the existing entry with new entry and update it's reference in availableWorkers.

  synchronized (workersMap) {
      availableWorkers.remove(workerInfo);        // evict old entry which contains reference
      workersMap.put(workerInfo.toUniqueId(), workerInfo); 
      ...
      updateAvailableWorkers(workerInfo);         // re-adds new reference
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants