Port Scripts to Use the Cray Programming Environment Deep Learning Plugin

Provides instructions for porting a TensorFlow training script to use the Cray PE DL plugin,

This procedure requires Urika-CS software installed on a Cray CS system.

To port a TensorFlow training script to use the Cray PE DL plugin, it is recommended to start with a training that executes serially, i.e., it does not use distributed TensorFlow. With that script, the following modifications are necessary to use the plugin:
  1. Initializing the Cray PE DL plugin, specifying the number of teams, threads, model size
  2. Broadcasting initial model parameter values
  3. Using the Cray PE DL plugin to communicate gradients after gradient computation and before the model update
  4. Finalizing the Cray PE DL plugin

Modifications that are not required by the Cray PE DL plugin, yet common for parallelization, include:

  • Correcting the definition of an epoch for the global mini-batch size (all processes)
  • Correcting the learning rate:
    • Linear or square root scaling rule
    • Adding a learning rate decay schedule
  • Average performance metrics using Cray PE DL plugin helper functions

    Only a single rank produces the desired prints. In addition, either a single rank writes checkpoints or each rank writes to a unique location

The following code excerpts (from the MNIST example provided with the release) review the required modifications for using the Cray PE DL plugin.

  1. Initialize the Cray PE DL plugin.
    This is done by first importing the dl_comm module and calling its init() function as follows:
    Example of initializing the Cray PE DL plugin
    # import the Cray PE DL Plugin module
    import dl_comm as mc
      
    def main():
     
      # Build the model
      model=build_model(n_in, n_layer, n_hid, n_out)
       
      # if the Plugin is enabled initialize it
      if (flags.enable_dl_comm == 1):
       
          # determine the model size
          totsize = sum([v for v in tf.trainable_variables()])
           
          # initialize the Cray PE DL Plugin
          mc.init(nthread_per_team=2, nteams=1, msglen=totsize, "tensorflow")
          ...
    Note the call to mc.init() includes the additional argument of tensorflow, which indicates that TensorFlow is the training framework. There are two effects from specifying tensorflow. First, calls to the Cray PE DL plugin operations for broadcasting initial model parameters (mc.broadcast()) and gradient aggregation (mc.gradients()) switch from expecting NumPy array data buffers to native TensorFlow Tensor buffers. Second, TensorFlow operations for both broadcast and gradient aggregation get loaded, which allows those calls to be added to the execution graph.

    The next initialization step is to configure the single thread team to be aware of the total number of training steps. Users first decide on the number of training epochs and number of samples in a mini-batch per process. The total number of training steps Msteps is then given by:

    Msteps=ntrain / (kranks×blocal) × Nepochs

    where Nepochs is the number of training epochs, ntrain is the number of samples in training dataset, kranks is the number of workers (MPI ranks or processes), and blocal is the local batch size.

    The configuration step is shown below with multiples suggestions about how to modify the configuration, continuing from the previous code block:

    Example of configuration the Cray PE DL plugin initialization parameters
    ...
    # use the CPE DL Plugin to get our rank and
    # the number of processes
    myrank     = mc.get_rank()
    numworkers = mc.get_nranks()
     
    # num_train_samps is the number of samples in
    # our training data set
    max_steps = int(math.ceil(flags.train_epochs * (num_train_samps) / (numworkers * batch_size)))
     
    # configure the single thread team and have rank 0 print out communication performance metrics
    # every 100 steps
    mc.config_team(0, 0, max_steps, max_steps, 1, 100)
     
     
    # The above configuration can be modified as needed. For example, to configure the team to complete
    # a blocking warm-up phase for the first 10% of training without a smooth transition:
    # mc.config_team(0, 0, int(0.10 * max_steps), max_steps, 1, 100)
    #
    # To instead use a smooth transition:
    # mc.config_team(0, 1, int(0.10 * max_steps), max_steps, 1, 100)
    #
    # To perform a cool-down blocking phase for the final 10% of training, make the following change:
    # mc.config_team(0, 1, -1*int(0.90 * max_steps), max_steps, 1, 100)
    ...
  2. Broadcast initial model parameters.

    With TensorFlow, it is ideal to use SessionRunHook to manage the broadcast and any desired averaging of metrics. Session hooks can be passed to convenience classes, such as Estimator, to be run at specific points in session management. A typical SessionRunHook for broadcasting initial model parameters is defined as follows:

    Example of broadcasting initial parameters in the SessionRunHook class:
    class BcastTensors(tf.train.SessionRunHook):
     
      def __init__(self):
        self.bcast = None
     
      def begin(self):
        if not self.bcast:
          new_vars   = mc.broadcast(tf.trainable_variables(),0)
          self.bcast = tf.group(*[tf.assign(v,new_vars[k]) for k,v in enumerate(tf.trainable_variables())])
     
      def after_create_session(self, session, coord):
         session.run(self.bcast)
    This SessionRunHook broadcasts the model parameters from rank 0 to all other MPI ranks and then assigns the new values. The actual operation is performed after the session is created. This SessionRunHook must be provided to the Estimator instance as follows:
    Example of supplying SessionRunHook to the Estimator instance:
    train_hooks = None
     
    # if the Cray PE DL Plugin is enabled add the hook
    if (dlcomm == 1):
        train_hooks = [BcastTensors()]
     
    cnf = tf.estimator.EstimatorSpec(mode=mode,
                                     predictions=predictions,
                                     loss=loss,
                                     train_op=train_op,
                                     training_hooks=train_hooks,
                                     eval_metric_ops=metrics)
     
    classifier = tf.estimator.Estimator(model_fn=cnf)
     
    classifier.train(input_fn=input_fn_train, steps=tsteps, max_steps=flags.max_train_steps)
  3. Perform gradient aggregation.
    The communication and performance intensive operation that is highly optimized in the Cray PE DL plugin is gradient aggregation. This is placed between gradient computation and model update as follows:
    Example of gradient aggregation and communication
    def train(model_function, train_samp, eval_samp,
            batch_size)
     
        ....
        # often a serial code will use the
        # optimizer minimize() method
        if (dlcomm != 1):
     
          minimize_op = optimizer.minimize(loss, global_step)
     
        else:
          # for the Cray PE DL Plugin
          # we need to split out the minimize call below
          # so we can communicate/average gradients
          grads_and_vars = optimizer.compute_gradients(loss)
     
          grads     =  mc.gradients([gv[0] for gv in grads_and_vars], 0)
          gs_and_vs = [(g,v) for (_,v), g in zip(grads_and_vars, grads)]
     
          minimize_op = optimizer.apply_gradients(gs_and_vs, global_step)
        ...
    It is common for a serial training script to use the minimize() method of an optimizer. This method computes gradients and updates the model with those gradients. The global reduction of local gradients must be done between those steps for data parallel training, however. In the code block above, minimize() is split into compute_gradients() and apply_gradients() so that the mc.gradients() call can be added. This operation is added to the execution graph and will have direct access to gradient Tensors, located in CPU or GPU memory, without additional buffering.
  4. Finalize the plugin.
    The final required step for porting a serial training script is to finalize the Cray PE DL plugin, similar to finalizing MPI. This call should be added after training is complete and the session is closed. Often this is at the end of the main() function.
    Example of finalizing the Cray PE DL plugin:
    def main():
     
        ...
        # training is complete and we're ready to exit
        mc.finalize()
    Cray provides several examples with the Cray PE DL plugin package for users to reference. MNIST and tf_cnn_benchmarks examples are provided as part of this release. The tf_cnn_benchmarks example is commonly used to benchmark the performance across a set of standard CNNs.