Model training

Model Training

In this task, a model for recognition is trained in a federated manner. In this example, a model for recognizing the digit from the MNIST dataset (https://www.tensorflow.org/datasets/catalog/mnist).

Assets

For this procedure, two sets of assets are required. One for the model_creator and one for model_contributor. Possible examples for these assets can be found in substrate-client-decentralml/assets.

Here's an example of the code and assets for the model_creator:

model_creator
├── __init__.py
├── model_creator.py
├── setup.sh
├── requirements.txt
└── settings.py
  • model_creator.py contains the python code for generating the first model, saving it and federated the results once the contributors have completed their training.

  • setup.sh is a script to setup the development environment for the model_creator

  • requirements.txt lists the python requirements for the model developement

  • settings.py is just a support file for specifiying the model parameters for the model_contributor and the creator.

The model_creator must also create the python code for the contributor to perform his task:

model_contributor
├── __init__.py
├── model_contributor.py
├── requirements.txt
├── settings.py
└── start_task.sh
  • model_contributor.py contains the python code for the training of the model

  • requirements.txt lists the python requirements for the model developement

  • settings.py is just a support file for specifiying the model parameters for the model_contributor and the creator.

  • start_task.sh is a script for the model_contributor to actually execute the task

Procedure

  1. The model_creator starts by creating a model structure and compiles it:

    # assets/model_creator/model_creator.py
    def create_model():
        model = tf.keras.models.Sequential([
                    tf.keras.layers.Flatten(input_shape=(28, 28)),
                    tf.keras.layers.Dense(128, activation='relu'),
                    tf.keras.layers.Dense(10)
                    ])
        model.compile(
            optimizer=tf.keras.optimizers.Adam(0.001),
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
        )
        return model

    Once the first model is generated, it can be trained:

    # assets/model_creator/model_creator.py
    def train_model(model, x_train, y_train, epochs=1):
        model.fit(x_train, y_train, epochs)
        return model

    Evaluated:

    #assets/model_creator/model_creator.py
    def evaluate_model(model, x_test, y_test):
        return model.evaluate(x_test, y_test)

    And more importantly, saved for the model contributors to use:

    #assets/model_creator/model_creator.py
    def save_model(model, output_path, model_name):
        model.save(f"{output_path}/{model_name}")

    In a federated system, the training step is generally delegated to the model contributors, but the model creator could perform some training just to initiate the system. The contributors can then subsequently training on new data.

    Note all these steps are part of the model_creator.py in the assets folder.

  2. In order to start the federated training, the model_creator can then create a task for model contributors using the function:

    #decentralml/create_task.py
    def create_task_model_contributor(expiration_block, substrate, sudoaccount, passphrase, task_type, question, pays_amount, max_assignments, validation_strategy, model_contributor_script_path, model_contributor_script_storage_type, model_contributor_script_storage_credentials):

    In which:

    • model_contributor_script_path is the path to the assets for the model_contributor

    For additional info on the substrate parameters (i.e. expiration block, substrate, etc.) consult the documentation of the python client or view the example (https://github.com/livetreetech/DecentralML/blob/main/substrate-client-decentralml/src/decentralml/create_task.py).

  3. The model_contributor then can list_task and accept a task with:

    #decentralml/assign_task.py
    def assign_task(substrate, sudoaccount, passphrase, task_id)

    by specifying the task_id. Assigning a task will download the corresponding assets for model contributor task.

  4. The model_contributor can then start the task by executing the script provided by the model_creator:

    ./start_task.sh

    which setups the environment and starts the python code for the federated learning in the model_contributor.py file.

    A model contributor receive a model and trains it on a subset of data that can be either provided by the model creator, or provided locally by the contributor. In this example, the data are provided by the model creator as a method to load them:

    #assets/model_contributor/model_contributor.py
    def load_data():
        (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
        x_train, x_test = x_train / 255.0, x_test / 255.0
        return (x_train, y_train), (x_test, y_test)

    The contributor can then load the model provided by the creator for the training:

    #assets/model_contributor/model_contributor.py
    def load_model(model_path, model_name):
        model = tf.keras.models.load_model(f"{model_path}/{model_name}")
        print(model.summary())
        return model

    The contributor then starts the training of the model according with the indication of the creator (i.e. epochs, batch_size, etc.):

    #assets/model_contributor/model_contributor.py
    def train_model(model, x_train, y_train, epoch=100, batch_size=32):
        model.fit(x_train, y_train, epochs=epoch, batch_size=batch_size)
        return model

    The generated model can finally be saved for the model creator to federate the model training results:

    #assets/model_contributor/model_contributor.py
    def save_model(model, output_path, model_name, contributo_id_length=10):
        contributor_id = get_random_string(contributo_id_length)
        model.save(f"{output_path}/{model_name}_{contributor_id}")

    Saving the model creates an output folder which includes the structure and the weights. The name of the folder includes the model name and an id for the contributor:

    example_model_hirabujcbw/
    ├── assets
    ├── fingerprint.pb
    ├── keras_metadata.pb
    ├── saved_model.pb
    └── variables
        ├── variables.data-00000-of-00001
        └── variables.index

    The contributor ID in this example is a randomly generated string used to uniquely identify the different models generated by the contributors as part of the training.

  5. Once the model_contributor has completed his task, the model_contributor can send the results using:

    #decentralml/send_task_result.py
    def send_task_result(substrate, keypair, submission_id, result, result_path, result_storage_type, result_storage_credentials)

    This function accepts a parameter result_path which will have to be set to the output folder containing the saved model. Sending the results uploads the model training results to a remote and/or shared storage.

  6. The model_creator can list the available results for each task using the list_task_results (see Listing objects).

  7. Once, a result is available, the model_creator can start validating the results using the validate_task_results. The validation of the results can be performed according to three policies:

    • AutoAccept: the results are automatically accepted

    • ManualAccept: the model_creator manually accepts each task results

    • CustomAccept: the model_creator can implement custom methods for automatically validating the results.

    Starting the validation process downloads the results and the corresponding saved models. In this example, we explain a manual validation process. The model_creator can validate process by loading the federated models. For this example, the functions to federate the models are included in the assets/model_creator/model_creator.py.

    #assets/model_creator/model_creator.py
    def load_contributors_models(contributors_models_path, model_name):
    model_contributors_folders_path = f"{contributors_models_path}/{model_name}_*"
    model_folders = [f for f in glob.glob(model_contributors_folders_path)]
    contributors_models = list()
    for model_folder in model_folders:
        model = tf.keras.models.load_model(model_folder)
        contributors_models.append(model)
    return contributors_models

    Then, the results of each trained model are federated averaging the model weights:

    #assets/model_creator/model_creator.py
    def federate_contributors_model(contributors_models, policy="average"):
        client_weights = [model.trainable_variables for model in contributors_models]
    
        new_weights = None
    
        if policy=="average":
            # Compute the average weights for each layer
            avg_weights = [
                tf.reduce_mean(layer_weight_tensors, axis=0)
                for layer_weight_tensors in zip(*client_weights)
            ]
            new_weights = avg_weights
        
        return new_weights

    Finally, the new weights can be applied to the same model structure that was originally created:

    #assets/model_creator/model_creator.py
    def set_model_weights(model, weights):
        model.set_weights(weights)
        return model

    This model can then be evaluated:

    #assets/model_creator/model_creator.py
    def evaluate_model(model, x_test, y_test):
        return model.evaluate(x_test, y_test)
  8. Once the validation process is complete, the model_creator or the automatic validation procedure can either accept or reject the results, using respectively accept_task_results() or reject_task_results().

    Accepting the results issues the payment to the contributor.

Last updated