Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ message PbWorkerInfo {
int32 internalPort = 8;
string networkLocation = 9;
int64 nextInterruptionNotice = 10; // Unix timestamp when disruption is expected to be initiated
repeated string tags = 11;
}

message PbFileGroup {
Expand All @@ -207,6 +208,7 @@ message PbRegisterWorker {
map<string, PbResourceConsumption> userResourceConsumption = 8;
int32 internalPort = 10;
string networkLocation = 11;
repeated string tags = 12;
}

message PbMetaRegisterWorkerRequest {
Expand All @@ -219,6 +221,7 @@ message PbMetaRegisterWorkerRequest {
map<string, PbResourceConsumption> userResourceConsumption = 7;
int32 internalPort = 8;
string networkLocation = 9;
repeated string tags = 12;
}

message PbHeartbeatFromWorker {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)

def tagsEnabled: Boolean = get(TAGS_ENABLED)
def tagsWorkerRegistrationEnabled: Boolean = get(TAGS_WORKER_REGISTRATION_ENABLED)
def workerTags: Seq[String] = get(WORKER_TAGS)
def tagsExpr: String = get(TAGS_EXPR)
def preferClientTagsExpr: Boolean = get(PREFER_CLIENT_TAGS_EXPR)

Expand Down Expand Up @@ -6845,6 +6847,24 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val TAGS_WORKER_REGISTRATION_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.tags.worker.registration.enabled")
.categories("master")
.version("0.7.0")
.doc("When true, the master honors tags advertised by workers at registration " +
"(merged with the config-store tags). When false, worker-supplied tags are ignored.")
.booleanConf
.createWithDefault(true)

val WORKER_TAGS: ConfigEntry[Seq[String]] =
buildConf("celeborn.worker.tags")
.categories("worker")
.version("0.7.0")
.doc("Comma-separated tags this worker supplies to the master at registration.")
.stringConf
.toSequence
.createWithDefault(Seq.empty)

val TAGS_EXPR: ConfigEntry[String] =
buildConf("celeborn.tags.tagsExpr")
.categories("master", "client")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class WorkerInfo(
var nextInterruptionNotice = Long.MaxValue
var lastHeartbeat: Long = 0
var workerStatus = WorkerStatus.normalWorkerStatus()
var isHighWorkLoad: Boolean = false;
var isHighWorkLoad: Boolean = false
var tags: util.Set[String] = new util.HashSet[String]()
val diskInfos = {
if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, DiskInfo](_diskInfos)
else null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ object ControlMessages extends Logging {
networkLocation: String,
disks: Map[String, DiskInfo],
userResourceConsumption: Map[UserIdentifier, ResourceConsumption],
tags: Set[String],
requestId: String): PbRegisterWorker = {
val pbDisks = disks.values.map(PbSerDeUtils.toPbDiskInfo).asJava
val pbUserResourceConsumption =
Expand All @@ -107,6 +108,7 @@ object ControlMessages extends Logging {
.setNetworkLocation(networkLocation)
.addAllDisks(pbDisks)
.putAllUserResourceConsumption(pbUserResourceConsumption)
.addAllTags(tags.asJava)
.setRequestId(requestId)
.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ object PbSerDeUtils {
} else {
workerInfo.nextInterruptionNotice = pbWorkerInfo.getNextInterruptionNotice
}
workerInfo.tags = new util.HashSet[String](pbWorkerInfo.getTagsList)
workerInfo
}

Expand All @@ -310,6 +311,7 @@ object PbSerDeUtils {
.setPushPort(workerInfo.pushPort)
.setReplicatePort(workerInfo.replicatePort)
.setInternalPort(workerInfo.internalPort)
.addAllTags(workerInfo.tags)
if (masterPersistWorkerNetworkLocation) {
builder.setNetworkLocation(workerInfo.networkLocation)
}
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,5 @@ license: |
| celeborn.tags.enabled | true | false | Whether to enable tags for workers. | 0.6.0 | |
| celeborn.tags.preferClientTagsExpr | false | true | When `true`, prefer the tags expression provided by the client over the tags expression provided by the master. | 0.6.0 | |
| celeborn.tags.tagsExpr | | true | Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example, `prod,high-io` filters workers that have both the `prod` and `high-io` tags. | 0.6.0 | |
| celeborn.tags.worker.registration.enabled | true | false | When true, the master honors tags advertised by workers at registration (merged with the config-store tags). When false, worker-supplied tags are ignored. | 0.7.0 | |
<!--end-include-->
1 change: 1 addition & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ license: |
| celeborn.worker.storage.storagePolicy.createFilePolicy | &lt;undefined&gt; | false | This defined the order for creating files across available storages. Available storages options are: MEMORY,SSD,HDD,HDFS,S3,OSS | 0.5.1 | |
| celeborn.worker.storage.storagePolicy.evictPolicy | &lt;undefined&gt; | false | This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS,S3,OSS. Definition: StorageTypes|StorageTypes|StorageTypes. Example: MEMORY,SSD|SSD,HDFS. The example means that a MEMORY shuffle file can be evicted to SSD and a SSD shuffle file can be evicted to HDFS. | 0.5.1 | |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false | Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir |
| celeborn.worker.tags | | false | Comma-separated tags this worker supplies to the master at registration. | 0.7.0 | |
| celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file writer to close | 0.2.0 | |
| celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a file writer to create if its creation was failed. | 0.2.0 | |
| celeborn.worker.writer.create.parallel.enabled | false | false | Whether to parallelize the creation of file writer. | 0.6.3 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ public void updateRegisterWorkerMeta(
int replicatePort,
int internalPort,
String networkLocation,
Map<String, DiskInfo> disks) {
Map<String, DiskInfo> disks,
Set<String> tags) {
WorkerInfo workerInfo =
new WorkerInfo(
host,
Expand All @@ -370,6 +371,7 @@ public void updateRegisterWorkerMeta(
disks,
new HashMap<>());
workerInfo.lastHeartbeat_$eq(System.currentTimeMillis());
workerInfo.tags_$eq(new HashSet<>(tags));
if (networkLocation != null
&& !networkLocation.isEmpty()
&& !NetworkTopology.DEFAULT_RACK.equals(networkLocation)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.celeborn.service.deploy.master.clustermeta;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.ApplicationMeta;
Expand Down Expand Up @@ -77,6 +79,31 @@ void handleWorkerHeartbeat(
WorkerStatus workerStatus,
String requestId);

default void handleRegisterWorker(
String host,
int rpcPort,
int pushPort,
int fetchPort,
int replicatePort,
int internalPort,
String networkLocation,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
String requestId) {
handleRegisterWorker(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
internalPort,
networkLocation,
disks,
userResourceConsumption,
Collections.emptySet(),
requestId);
}

void handleRegisterWorker(
String host,
int rpcPort,
Expand All @@ -87,6 +114,7 @@ void handleRegisterWorker(
String networkLocation,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Set<String> tags,
String requestId);

void handleReportWorkerUnavailable(List<WorkerInfo> failedNodes, String requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -162,9 +163,18 @@ public void handleRegisterWorker(
String networkLocation,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Set<String> tags,
String requestId) {
updateRegisterWorkerMeta(
host, rpcPort, pushPort, fetchPort, replicatePort, internalPort, networkLocation, disks);
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
internalPort,
networkLocation,
disks,
tags);
updateWorkerResourceConsumptions(
host, rpcPort, pushPort, fetchPort, replicatePort, userResourceConsumption);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -349,6 +350,7 @@ public void handleRegisterWorker(
String networkLocation,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Set<String> tags,
String requestId) {
try {
ratisServer.submitRequest(
Expand All @@ -365,6 +367,7 @@ public void handleRegisterWorker(
.setInternalPort(internalPort)
.setNetworkLocation(networkLocation)
.putAllDisks(MetaUtil.toPbDiskInfos(disks))
.addAllTags(tags)
.build())
.build());
updateWorkerResourceConsumptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public org.apache.celeborn.common.protocol.PbMetaRequestResponse handleWriteRequ
int internalPort = request.getRegisterWorkerRequest().getInternalPort();
Map<String, PbDiskInfo> pbDiskInfo = request.getRegisterWorkerRequest().getDisksMap();
diskInfos = MetaUtil.fromPbDiskInfoMap(pbDiskInfo);
Set<String> tags = new HashSet<>(request.getRegisterWorkerRequest().getTagsList());
LOG.debug(
"Handle worker register for {} {} {} {} {} {} {}",
host,
Expand All @@ -275,7 +276,8 @@ public org.apache.celeborn.common.protocol.PbMetaRequestResponse handleWriteRequ
replicatePort,
internalPort,
networkLocation,
diskInfos);
diskInfos,
tags);
break;

case ReportWorkerUnavailable:
Expand Down
1 change: 1 addition & 0 deletions master/src/main/proto/Resource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ message RegisterWorkerRequest {
map<string, ResourceConsumption> userResourceConsumption = 7;
int32 internalPort = 8;
optional string networkLocation = 9;
repeated string tags = 10;
}

message ReportWorkerUnavailableRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ private[celeborn] class Master(
.toMap.asJava
val userResourceConsumption =
PbSerDeUtils.fromPbUserResourceConsumption(pbRegisterWorker.getUserResourceConsumptionMap)
val tags =
if (conf.tagsWorkerRegistrationEnabled) pbRegisterWorker.getTagsList.asScala.toSet
else Set.empty[String]

logDebug(s"Received RegisterWorker request $requestId, $host:$pushPort:$replicatePort" +
s" $disks.")
Expand All @@ -525,6 +528,7 @@ private[celeborn] class Master(
networkLocation,
disks,
userResourceConsumption,
tags,
requestId))

case requestSlots @ RequestSlots(applicationId, _, _, _, _, _, _, _, _, _, _, _, _) =>
Expand Down Expand Up @@ -845,6 +849,7 @@ private[celeborn] class Master(
networkLocation: String,
disks: util.Map[String, DiskInfo],
userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
tags: Set[String],
requestId: String): Unit = {
val workerToRegister =
new WorkerInfo(
Expand Down Expand Up @@ -880,6 +885,7 @@ private[celeborn] class Master(
networkLocation,
disks,
userResourceConsumption,
tags.asJava,
requestId)
context.reply(RegisterWorkerResponse(true, "Worker in snapshot, re-register."))
} else if (statusSystem.workerLostEvents.contains(workerToRegister)) {
Expand All @@ -896,6 +902,7 @@ private[celeborn] class Master(
networkLocation,
disks,
userResourceConsumption,
tags.asJava,
requestId)
context.reply(RegisterWorkerResponse(true, "Worker in workerLostEvents, re-register."))
} else {
Expand All @@ -909,6 +916,7 @@ private[celeborn] class Master(
networkLocation,
disks,
userResourceConsumption,
tags.asJava,
requestId)
logInfo(s"Registered worker $workerToRegister.")
context.reply(RegisterWorkerResponse(true, ""))
Expand Down
Loading
Loading