Model training
Model Training
In this task, a model for recognition is trained in a federated manner. In this example, a model for recognizing the digit from the MNIST dataset (https://www.tensorflow.org/datasets/catalog/mnist).
Assets
For this procedure, two sets of assets are required. One for the model_creator
and one for model_contributor
. Possible examples for these assets can be found in substrate-client-decentralml/assets
.
Here's an example of the code and assets for the model_creator
:
model_creator
├── __init__.py
├── model_creator.py
├── setup.sh
├── requirements.txt
└── settings.py
model_creator.py
contains the python code for generating the first model, saving it and federated the results once the contributors have completed their training.setup.sh
is a script to setup the development environment for themodel_creator
requirements.txt
lists thepython
requirements for the model developementsettings.py
is just a support file for specifiying the model parameters for themodel_contributor
and the creator.
The model_creator
must also create the python code for the contributor to perform his task:
model_contributor
├── __init__.py
├── model_contributor.py
├── requirements.txt
├── settings.py
└── start_task.sh
model_contributor.py
contains the python code for the training of the modelrequirements.txt
lists thepython
requirements for the model developementsettings.py
is just a support file for specifiying the model parameters for themodel_contributor
and the creator.start_task.sh
is a script for themodel_contributor
to actually execute the task
Procedure
The
model_creator
starts by creating a model structure and compiles it:# assets/model_creator/model_creator.py def create_model(): model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dense(10) ]) model.compile( optimizer=tf.keras.optimizers.Adam(0.001), loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()], ) return model
Once the first model is generated, it can be trained:
# assets/model_creator/model_creator.py def train_model(model, x_train, y_train, epochs=1): model.fit(x_train, y_train, epochs) return model
Evaluated:
#assets/model_creator/model_creator.py def evaluate_model(model, x_test, y_test): return model.evaluate(x_test, y_test)
And more importantly, saved for the model contributors to use:
#assets/model_creator/model_creator.py def save_model(model, output_path, model_name): model.save(f"{output_path}/{model_name}")
In a federated system, the training step is generally delegated to the model contributors, but the model creator could perform some training just to initiate the system. The contributors can then subsequently training on new data.
Note all these steps are part of the
model_creator.py
in the assets folder.In order to start the federated training, the
model_creator
can then create a task for model contributors using the function:#decentralml/create_task.py def create_task_model_contributor(expiration_block, substrate, sudoaccount, passphrase, task_type, question, pays_amount, max_assignments, validation_strategy, model_contributor_script_path, model_contributor_script_storage_type, model_contributor_script_storage_credentials):
In which:
model_contributor_script_path
is the path to the assets for themodel_contributor
For additional info on the substrate parameters (i.e. expiration block, substrate, etc.) consult the documentation of the python client or view the example (https://github.com/livetreetech/DecentralML/blob/main/substrate-client-decentralml/src/decentralml/create_task.py).
The
model_contributor
then canlist_task
and accept a task with:#decentralml/assign_task.py def assign_task(substrate, sudoaccount, passphrase, task_id)
by specifying the
task_id
. Assigning a task will download the corresponding assets for model contributor task.The
model_contributor
can then start the task by executing the script provided by themodel_creator
:./start_task.sh
which setups the environment and starts the python code for the federated learning in the
model_contributor.py
file.A model contributor receive a model and trains it on a subset of data that can be either provided by the model creator, or provided locally by the contributor. In this example, the data are provided by the model creator as a method to load them:
#assets/model_contributor/model_contributor.py def load_data(): (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 return (x_train, y_train), (x_test, y_test)
The contributor can then load the model provided by the creator for the training:
#assets/model_contributor/model_contributor.py def load_model(model_path, model_name): model = tf.keras.models.load_model(f"{model_path}/{model_name}") print(model.summary()) return model
The contributor then starts the training of the model according with the indication of the creator (i.e. epochs, batch_size, etc.):
#assets/model_contributor/model_contributor.py def train_model(model, x_train, y_train, epoch=100, batch_size=32): model.fit(x_train, y_train, epochs=epoch, batch_size=batch_size) return model
The generated model can finally be saved for the model creator to federate the model training results:
#assets/model_contributor/model_contributor.py def save_model(model, output_path, model_name, contributo_id_length=10): contributor_id = get_random_string(contributo_id_length) model.save(f"{output_path}/{model_name}_{contributor_id}")
Saving the model creates an output folder which includes the structure and the weights. The name of the folder includes the model name and an id for the contributor:
example_model_hirabujcbw/ ├── assets ├── fingerprint.pb ├── keras_metadata.pb ├── saved_model.pb └── variables ├── variables.data-00000-of-00001 └── variables.index
The contributor ID in this example is a randomly generated string used to uniquely identify the different models generated by the contributors as part of the training.
Once the
model_contributor
has completed his task, themodel_contributor
can send the results using:#decentralml/send_task_result.py def send_task_result(substrate, keypair, submission_id, result, result_path, result_storage_type, result_storage_credentials)
This function accepts a parameter
result_path
which will have to be set to the output folder containing the saved model. Sending the results uploads the model training results to a remote and/or shared storage.The
model_creator
can list the available results for each task using thelist_task_results
(see Listing objects).Once, a result is available, the
model_creator
can start validating the results using thevalidate_task_results
. The validation of the results can be performed according to three policies:AutoAccept: the results are automatically accepted
ManualAccept: the
model_creator
manually accepts each task resultsCustomAccept: the
model_creator
can implement custom methods for automatically validating the results.
Starting the validation process downloads the results and the corresponding saved models. In this example, we explain a manual validation process. The
model_creator
can validate process by loading the federated models. For this example, the functions to federate the models are included in theassets/model_creator/model_creator.py
.#assets/model_creator/model_creator.py def load_contributors_models(contributors_models_path, model_name): model_contributors_folders_path = f"{contributors_models_path}/{model_name}_*" model_folders = [f for f in glob.glob(model_contributors_folders_path)] contributors_models = list() for model_folder in model_folders: model = tf.keras.models.load_model(model_folder) contributors_models.append(model) return contributors_models
Then, the results of each trained model are federated averaging the model weights:
#assets/model_creator/model_creator.py def federate_contributors_model(contributors_models, policy="average"): client_weights = [model.trainable_variables for model in contributors_models] new_weights = None if policy=="average": # Compute the average weights for each layer avg_weights = [ tf.reduce_mean(layer_weight_tensors, axis=0) for layer_weight_tensors in zip(*client_weights) ] new_weights = avg_weights return new_weights
Finally, the new weights can be applied to the same model structure that was originally created:
#assets/model_creator/model_creator.py def set_model_weights(model, weights): model.set_weights(weights) return model
This model can then be evaluated:
#assets/model_creator/model_creator.py def evaluate_model(model, x_test, y_test): return model.evaluate(x_test, y_test)
Once the validation process is complete, the
model_creator
or the automatic validation procedure can either accept or reject the results, using respectivelyaccept_task_results()
orreject_task_results()
.Accepting the results issues the payment to the contributor.
Last updated