[CELEBORN-2349] Support worker supplied tags and TagsManager hardening#3719
[CELEBORN-2349] Support worker supplied tags and TagsManager hardening#3719s0nskar wants to merge 5 commits into
Conversation
|
@SteNicholas can you please trigger Copilot magic on this. |
There was a problem hiding this comment.
Pull request overview
This PR extends Celeborn’s worker tagging mechanism by allowing workers to advertise tags during registration, persisting those tags as part of WorkerInfo (thus flowing through existing HA/snapshot replication), and updating tag-based worker filtering to consider both config-store tags and worker-supplied tags.
Changes:
- Add worker-supplied tags to the worker registration flow (worker → protobuf → master/meta managers) with a master-side toggle (
celeborn.tags.worker.registration.enabled) to ignore them. - Persist tags in
WorkerInfoand include them in protobuf (de)serialization for snapshots / RPC payloads. - Harden
TagsManagerlogic and expand unit tests to cover config-store tags, self-registered tags, and mixed scenarios.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala | Sends configured worker tags during registration and stores them on local workerInfo. |
| master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala | Reads worker-supplied tags from registration request (optionally ignoring them by config) and passes them into meta registration. |
| master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala | Updates tag filtering to consider both config-store tags and self-registered tags. |
| master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala | Adds/updates unit tests for combined config-store + self-registered tag behavior. |
| common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala | Adds tags field to WorkerInfo. |
| common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala | Serializes/deserializes WorkerInfo.tags to/from protobuf. |
| common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala | Adds tags to the RegisterWorker protobuf builder. |
| master/src/main/proto/Resource.proto | Adds tags to meta-side RegisterWorkerRequest. |
| common/src/main/proto/TransportMessages.proto | Adds tags to PbWorkerInfo and worker registration messages. |
| master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/*.java | Plumbs tags through register-worker meta update paths (single-master + HA). |
| common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala | Introduces new config entries for worker tags and master toggle for honoring them. |
| docs/configuration/worker.md | Documents celeborn.worker.tags. |
| docs/configuration/master.md | Documents celeborn.tags.worker.registration.enabled. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val tags = resolveTagsExpr(userIdentifier, clientTagsExpr) | ||
| .split(",").map(_.trim).filter(_.nonEmpty) | ||
|
|
||
| if (tagsExpr.isEmpty) { | ||
| logWarning("No tags provided") | ||
| if (tags.isEmpty) { | ||
| logDebug("No tags provided, returning all workers") | ||
| return workers | ||
| } | ||
|
|
||
| val tags = tagsExpr.split(",").map(_.trim) | ||
|
|
||
| var workersForTags: Option[JSet[String]] = None | ||
| tags.foreach { tag => | ||
| val taggedWorkers = getTagStore.getOrDefault(tag, Collections.emptySet()) | ||
| workersForTags match { | ||
| case Some(w) => | ||
| w.retainAll(taggedWorkers) | ||
| case _ => | ||
| workersForTags = Some(new util.HashSet[String](taggedWorkers)) | ||
| } | ||
| } | ||
|
|
||
| if (workersForTags.isEmpty) { | ||
| logWarning(s"No workers for tags: $tagsExpr found in cluster") | ||
| return Collections.emptyList() | ||
| } | ||
|
|
||
| val workerTagsPredicate = new Predicate[WorkerInfo] { | ||
| override def test(w: WorkerInfo): Boolean = workersForTags.get.contains(w.toUniqueId) | ||
| override def test(w: WorkerInfo): Boolean = tags.forall { tag => | ||
| w.tags.contains(tag) || | ||
| tagStore.exists(_.getOrDefault(tag, Collections.emptySet()).contains(w.toUniqueId)) | ||
| } |
SteNicholas
left a comment
There was a problem hiding this comment.
Well-designed, cleanly executed feature. I traced the proto/serde/Raft wiring, the config gate, and the TagsManager refactor — the backward-compat story holds and I found no correctness bugs, only minor cleanups and one pre-existing limitation worth documenting. Approve with a few follow-ups.
Things I verified as correct
WorkerInfo.equals/hashCodekey only on host + 4 ports —tagsis excluded, so adding it doesn't perturb any map/set keyed onWorkerInfo.- Proto changes are backward compatible — all new
repeated string tagsfields; absent on old peers → empty list. The worker→master RPC (PbRegisterWorker), the Raft log (Resource.proto RegisterWorkerRequest), and snapshot serde (PbWorkerInfoviaPbSerDeUtils) are wired end to end. - Config gate is at a single choke point (
Master.scalastrips tags toSet.emptywhen disabled) and feeds all three registration sub-paths + the HA path. - Removed
addTagToWorker/removeTagFromWorker/removeTagFromCluster/defaultTagStorehave zero callers anywhere in the tree — they were effectively dead in production, which matches the PR's premise. Good cleanup. - Heartbeat does not clobber tags —
updateWorkerHeartbeatMetamutates only disk/status fields on the existingWorkerInfoand never touchestags. - Defensive copies (
new HashSet<>(tags)) on ingest; the newgetTaggedWorkerspredicate (AND across tags, self-OR-store per tag) matches the documented semantics.
Minor follow-ups (none blocking)
-
PbMetaRegisterWorkerRequestedit is dead + has a field-number gap. That message isn't referenced anywhere in the codebase, and the newtags = 12skips field numbers 10–11. I'd drop that proto edit; if you keep it, use10to avoid a confusing gap. -
Tag changes don't refresh on a re-register over a live entry.
AbstractMetaManager.updateRegisterWorkerMetausesworkersMap.putIfAbsent(...), so a worker that re-registers while its entry still exists keeps the oldWorkerInfo(old/empty tags). This is pre-existing behavior shared with disk info, not introduced here — but combined with this feature it means newceleborn.worker.tagsonly take effect after the stale entry is evicted (worker-lost), not on an immediate restart/re-register. Worth a doc note or follow-up. -
tagStoreis adefre-evaluated per (worker × tag) inside the predicate.getTags()is memoized so it's cheap (and actually avoids the old code's full-map copy), but hoisting it to onevalat the top ofgetTaggedWorkerswould be clearer and drop the per-pairOptionallocations. Style nit. -
Design note: the merge is union-only — the config store can add tags to a worker but can't remove a worker's self-declared tag; the only override is the cluster-wide
tagsWorkerRegistrationEnabled=false. Fine for v1, worth calling out in case per-worker override is later expected.
Testing
The TagsManagerSuite rewrite is good and covers self-tags, config-store, and merged cases. Gap: nothing exercises the registration → Raft → WorkerInfo.tags path — the unit tests set w.tags directly, bypassing the proto/serde wiring that is most of this PR. A small meta-manager test (register a worker carrying tags, assert getTaggedWorkers sees them, and assert tagsWorkerRegistrationEnabled=false drops them) would lock that down. The PR already notes e2e is in progress.
🤖 Generated with Claude Code
@SteNicholas What do you think about this? Should we update the tags in next heartbeat similar to disk infos. Or should we change the code slightly and update these values during re-registration itself. I think it will be cleaner if replace the existing entry with new entry and update it's reference in availableWorkers. |
What changes were proposed in this pull request?
This PR lets a worker advertise tags at startup and makes tags part of WorkerInfo, so they will use existing Raft replication and worker-lifecycle cleanup. This also gel well with current implementation, allowing tags from config service to be merged with worker supplied tags.
Backward Compatible:
celeborn.tags.worker.registration.enabled(default: true), will allow to ignore worker-supplied tags.Why are the changes needed?
TagsManagertoday only works off the dynamic-config store. There is no way for a worker to declare its own tags. This makes this feature a little hard to use.After this PR a worker can supply its own tags and manage its own tags lifecycle and still allow tags overriding via config service. This will make worker tags feature a lot easier to use.
Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?