Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions benchmark/configs/async_fl/async_fl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

# ========== Cluster configuration ==========
# ip address of the parameter server (need 1 GPU process)
ps_ip: 10.0.0.1
ps_ip: localhost

# ip address of each worker:# of available gpus process on each gpu in this node
# Note that if we collocate ps and worker on same GPU, then we need to decrease this number of available processes on that GPU by 1
# E.g., master node has 4 available processes, then 1 for the ps, and worker should be set to: worker:3
worker_ips:
- 10.0.0.1:[5]
- localhost:[4]

exp_path: $FEDSCALE_HOME/fedscale/core

Expand Down Expand Up @@ -42,8 +42,8 @@ job_conf:
- data_map_file: $FEDSCALE_HOME/benchmark/dataset/data/femnist/client_data_mapping/train.csv # Allocation of data to each client, turn to iid setting if not provided
- device_conf_file: $FEDSCALE_HOME/benchmark/dataset/data/device_info/client_device_capacity # Path of the client trace
- device_avail_file: $FEDSCALE_HOME/benchmark/dataset/data/device_info/client_behave_trace
- model: resnet56_cifar100 # NOTE: Please refer to our model zoo README and use models for these small image (e.g., 32x32x3) inputs
- model_zoo: fedscale-zoo
- model: resnet18 # NOTE: Please refer to our model zoo README and use models for these small image (e.g., 32x32x3) inputs
# - model_zoo: fedscale-zoo
- eval_interval: 5 # How many rounds to run a testing on the testing set
- rounds: 1000 # Number of rounds to run this training. We use 1000 in our paper, while it may converge w/ ~400 rounds
- filter_less: 21 # Remove clients w/ less than 21 samples
Expand All @@ -52,9 +52,10 @@ job_conf:
- learning_rate: 0.05
- batch_size: 20
- test_bsz: 20
- ps_port: 12345
- use_cuda: True
- decay_round: 50
- overcommitment: 1.0
- arrival_interval: 2
- arrival_interval: 10
- max_staleness: 0
- max_concurrency: 50
- async_buffer: 20 # Number of updates need to be aggregated before generating new model version
- async_buffer: 50 # Number of updates need to be aggregated before generating new model version
4 changes: 2 additions & 2 deletions benchmark/configs/cifar_cpu/cifar_cpu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ job_conf:
- num_participants: 4 # Number of participants per round, we use K=100 in our paper, large K will be much slower
- data_set: cifar10 # Dataset: openImg, google_speech, stackoverflow
- data_dir: $FEDSCALE_HOME/benchmark/dataset/data/ # Path of the dataset
- model: resnet56_cifar10 # NOTE: Please refer to our model zoo README and use models for these small image (e.g., 32x32x3) inputs
- model_zoo: fedscale-zoo # Default zoo (torchcv) uses the pytorchvision zoo, which can not support small images well
- model: shufflenet_v2_x2_0 # NOTE: Please refer to our model zoo README and use models for these small image (e.g., 32x32x3) inputs
# - model_zoo: fedscale-zoo # Default zoo (torchcv) uses the pytorchvision zoo, which can not support small images well
- eval_interval: 5 # How many rounds to run a testing on the testing set
- rounds: 600 # Number of rounds to run this training. We use 1000 in our paper, while it may converge w/ ~400 rounds
- filter_less: 0 # Remove clients w/ less than 21 samples
Expand Down
4 changes: 2 additions & 2 deletions benchmark/configs/femnist/conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ job_conf:
- data_map_file: $FEDSCALE_HOME/benchmark/dataset/data/femnist/client_data_mapping/train.csv # Allocation of data to each client, turn to iid setting if not provided
- device_conf_file: $FEDSCALE_HOME/benchmark/dataset/data/device_info/client_device_capacity # Path of the client trace
- device_avail_file: $FEDSCALE_HOME/benchmark/dataset/data/device_info/client_behave_trace
- model: resnet56_cifar10 # NOTE: Please refer to our model zoo README and use models for these small image (e.g., 32x32x3) inputs
- model_zoo: fedscale-zoo
- model: resnet18 # NOTE: Please refer to our model zoo README and use models for these small image (e.g., 32x32x3) inputs
# - model_zoo: fedscale-zoo
- eval_interval: 10 # How many rounds to run a testing on the testing set
- rounds: 5000 # Number of rounds to run this training. We use 1000 in our paper, while it may converge w/ ~400 rounds
- filter_less: 21 # Remove clients w/ less than 21 samples
Expand Down
69 changes: 42 additions & 27 deletions examples/async_fl/async_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ def __init__(self, args):
Aggregator.__init__(self, args)
self.resource_manager = ResourceManager(self.experiment_mode)
self.async_buffer_size = args.async_buffer
self.max_concurrency = args.max_concurrency
self.client_round_duration = {}
self.client_start_time = collections.defaultdict(list)
self.round_stamp = [0]
self.client_model_version = {}
self.client_model_version = collections.defaultdict(list)
self.virtual_client_clock = {}
self.weight_tensor_type = {}

Expand All @@ -40,6 +41,8 @@ def __init__(self, args):
self.aggregate_update = {}
self.importance_sum = 0
self.client_end = []
self.round_staleness = []
self.model_concurrency = collections.defaultdict(int)

def tictak_client_tasks(self, sampled_clients, num_clients_to_collect):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this async aggregator implementation, is there a notion of concurrency? In the PAPAYA paper, concurrency is a hyper-parameter in addition to the buffer size.
image

Copy link
Copy Markdown
Collaborator

@AmberLJC AmberLJC Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point and sorry for not combining Papaya's design. We should do 2 more things

  1. add the concurrency hyper-parameter
  2. Make sure when assigning the training task in tictak_client_tasks, the number of overlapping tasks doesn't exceed max_concurrency


Expand Down Expand Up @@ -108,8 +111,10 @@ def aggregate_client_weights(self, results):
"""
# Start to take the average of updates, and we do not keep updates to save memory
# Importance of each update is 1/staleness
client_staleness = self.round - self.client_model_version[results['clientId']]
client_staleness = self.round - self.client_model_version[results['clientId']].pop(0)

importance = 1./(math.sqrt(1 + client_staleness))
self.round_staleness.append(client_staleness)

new_round_aggregation = (self.model_in_update == 1)
if new_round_aggregation:
Expand All @@ -134,25 +139,21 @@ def aggregate_client_weights(self, results):
self.aggregate_update[p] = param_weight * importance
else:
self.aggregate_update[p] += param_weight * importance

# self.model_weights[p].data += param_weight * importance
# else:
# # Non-floats (e.g., num_batches), no need to aggregate but need to track
# self.aggregate_update[p] = param_weight

if self.model_in_update == self.async_buffer_size:
for p in self.model_weights:
d_type = self.weight_tensor_type[p]
self.model_weights[p].data = (
self.model_weights[p].data + self.aggregate_update[p]/self.importance_sum
self.model_weights[p].data + self.aggregate_update[p] / float(self.importance_sum) # self.model_in_update
).to(dtype=d_type)

def round_completion_handler(self):
self.round += 1

logging.info(f"Round {self.round} average staleness {np.mean(self.round_staleness)}")
self.round_staleness = []
self.global_virtual_clock = self.round_stamp[-1]

self.round += 1

if self.round % self.args.decay_round == 0:
self.args.learning_rate = max(
self.args.learning_rate * self.args.decay_factor, self.args.min_learning_rate)
Expand All @@ -172,10 +173,10 @@ def round_completion_handler(self):

# update select participants
# NOTE: we simulate async, while have to sync every 20 rounds to avoid large division to trace
if self.resource_manager.get_task_length() < self.async_buffer_size*2:
if self.resource_manager.get_task_length() < self.async_buffer_size:

self.sampled_participants = self.select_participants(
select_num_participants=self.async_buffer_size*20, overcommitment=self.args.overcommitment)
select_num_participants=self.async_buffer_size*5, overcommitment=self.args.overcommitment)
(clientsToRun, clientsStartTime, virtual_client_clock) = self.tictak_client_tasks(
self.sampled_participants, len(self.sampled_participants))

Expand Down Expand Up @@ -253,21 +254,30 @@ def get_client_conf(self, clientId):
def create_client_task(self, executorId):
"""Issue a new client training task to the executor"""

next_clientId = self.resource_manager.get_next_task(executorId)
train_config = None
model = None

if next_clientId != None:
config = self.get_client_conf(next_clientId)
start_time = self.client_start_time[next_clientId][0]
model_id = self.find_latest_model(start_time)
self.client_model_version[next_clientId] = model_id
end_time = self.client_round_duration[next_clientId] + start_time

# The executor has already received the model, thus transferring id is enough
model = model_id
train_config = {'client_id': next_clientId, 'task_config': config, 'end_time': end_time}
logging.info(f"Client {next_clientId} train on model {model_id} during {int(start_time)}-{int(end_time)}")
while True:
next_clientId = self.resource_manager.get_next_task(executorId)
if next_clientId != None:
config = self.get_client_conf(next_clientId)
start_time = self.client_start_time[next_clientId][0]
end_time = self.client_round_duration[next_clientId] + start_time
model_id = self.find_latest_model(start_time)
if end_time < self.round_stamp[-1] or self.model_concurrency[model_id] > self.max_concurrency + self.async_buffer_size:
self.client_start_time[next_clientId].pop(0)
continue

self.client_model_version[next_clientId].append(model_id)

# The executor has already received the model, thus transferring id is enough
model = model_id
train_config = {'client_id': next_clientId, 'task_config': config, 'end_time': end_time}
logging.info(
f"Client {next_clientId} train on model {model_id} during {int(start_time)}-{int(end_time)}")
self.model_concurrency[model_id] += 1
break
else:
break

return train_config, model

Expand All @@ -290,11 +300,17 @@ def client_completion_handler(self, results):
# Format:
# -results = {'clientId':clientId, 'update_weight': model_param, 'moving_loss': round_train_loss,
# 'trained_size': count, 'wall_duration': time_cost, 'success': is_success 'utility': utility}

# [Async] some clients are scheduled earlier, which should be aggregated in previous round but receive the result late
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: why do we want to ignore clients that should be aggregated in previous rounds? Don't we want to aggregate it anyways with a staleness factor?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's also a solution🤔, if we ignore the fact that the supposed end_time of the result has passed.
(The reason of receive "past" results is that we schedule the training task based on its end_time, but we cannot control the order of task finishing. )
I will either fix the grpc problem or just follow your suggestion. @fanlai0990 what do you think

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can aggregate it according to the staleness factor for now.

if self.client_round_duration[results['clientId']] + self.client_start_time[results['clientId']][0] < self.round_stamp[-1]:
# Ignore tasks that are issued earlier but finish late
self.client_start_time[results['clientId']].pop(0)
logging.info(f"Warning: Ignore late-response client {results['clientId']}")
return
if self.round - self.client_model_version[results['clientId']][0] > self.args.max_staleness:
logging.info(f"Warning: Ignore stale client {results['clientId']} with {self.round - self.client_model_version[results['clientId']][0]}")
self.client_model_version[results['clientId']].pop(0)
return

# [ASYNC] New checkin clients ID would overlap with previous unfinished clients
logging.info(f"Client {results['clientId']} completes from {self.client_start_time[results['clientId']][0]} to {self.client_start_time[results['clientId']][0]+self.client_round_duration[results['clientId']]}")
Expand Down Expand Up @@ -340,7 +356,6 @@ def CLIENT_EXECUTE_COMPLETION(self, request, context):
executor_id, client_id, event = request.executor_id, request.client_id, request.event
execution_status, execution_msg = request.status, request.msg
meta_result, data_result = request.meta_result, request.data_result
# logging.info(f"$$$$$$$$ ({executor_id}) CLIENT_EXECUTE_COMPLETION client {client_id} with event {event}")

if event == commons.CLIENT_TRAIN:
# Training results may be uploaded in CLIENT_EXECUTE_RESULT request later,
Expand Down Expand Up @@ -396,7 +411,7 @@ def event_monitor(self):
clientID = self.deserialize_response(data)['clientId']
logging.info(
f"last client {clientID} at round {self.round} ")

# [ASYNC] handle different completion order
self.round_stamp.append(max(self.client_end))
self.client_end = []
self.round_completion_handler()
Expand Down
5 changes: 3 additions & 2 deletions examples/async_fl/async_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def testing_handler(self, args, config=None):

evalStart = time.time()
device = self.device
model = self.load_global_model()# config['test_model']
model = self.load_global_model() # config['test_model']
if self.task == 'rl':
client = RLClient(args)
test_res = client.test(args, self.this_rank, model, device=device)
Expand Down Expand Up @@ -128,6 +128,7 @@ def event_monitor(self):
train_model = self.deserialize_response(request.data)
if train_model is not None and not self.check_model_version(train_model):
# The executor may have not received the model due to async grpc
# TODO: server will lose track of scheduled but not executed task and remove the model
self.event_queue.append(request)
logging.error(f"Warning: Not receive model {train_model} for client {train_config['client_id'] }")
time.sleep(1)
Expand All @@ -147,7 +148,7 @@ def event_monitor(self):

elif current_event == commons.MODEL_TEST:
test_configs = self.deserialize_response(request.meta)
self.remove_stale_models(test_configs['straggler_round'])
# self.remove_stale_models(test_configs['straggler_round'])
self.Test(test_configs)

elif current_event == commons.UPDATE_MODEL:
Expand Down
6 changes: 3 additions & 3 deletions fedscale/core/aggregation/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,7 @@ def client_completion_handler(self, results):
results['moving_loss']),
time_stamp=self.round,
duration=self.virtual_client_clock[results['clientId']]['computation'] +
self.virtual_client_clock[results['clientId']
]['communication']
self.virtual_client_clock[results['clientId']]['communication']
)

# ================== Aggregate weights ======================
Expand Down Expand Up @@ -850,7 +849,8 @@ def CLIENT_EXECUTE_COMPLETION(self, request, context):
executor_id, event, meta_result, data_result)
else:
logging.error(f"Received undefined event {event} from client {client_id}")


# TODO: whether we should schedule tasks when client_ping or client_complete
if self.resource_manager.has_next_task(executor_id):
# NOTE: we do not pop the train immediately in simulation mode,
# since the executor may run multiple clients
Expand Down
1 change: 1 addition & 0 deletions fedscale/core/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
# for asynchronous FL buffer size
parser.add_argument('--max_concurrency', type=int, default=100)
parser.add_argument('--async_buffer', type=int, default=10)
parser.add_argument('--max_staleness', type=int, default=5)
parser.add_argument(
'--checkin_period', type=int, default=50, help='number of rounds to sample async clients'
)
Expand Down
Loading