Dask Gateway at Purdue AF

Dask Gateway is a service that allows to manage Dask clusters in a milti-tenant environment such as the Purdue Analysis Facility.

To make Dask Gateway useful in a variety of analysis workflows, we provide four ways to work with it:

  • The Dask Gateway cluster creation can be done in two ways:

    • Interactively from the Dask Labextension interface

    • Manually in a Jupyter Notebook or in a Python script

  • For each of these methods, we allow to create two types of clusters:

    • Dask Gateway cluster with SLURM backend: workers are submitted to Purdue Hammer cluster. This is available to Purdue users only due to Purdue data access policies.

      With this method, users can potentially create hundreds of workers, but in practice requesting more than 100 workers is usually associated with some wait time due to competiton with CMS production jobs and other users.

    • Dask Gateway cluster with Kubernetes backend: workers are submitted to Purdue Geddes cluster. This is available to all users.

      With this method, the workers are scheduled almost instantly, but for now we restrict the total per-user resource usage to 100 cores, 400 GB RAM due to limited resources in the Analysis Facility.

    The pros and cons of the Dask Gateway backends are summarized in the following table:

    Dask Gateway + SLURM

    Dask Gateway + Kubernetes

    Pros

    • SLURM is familiar to current users

    • Easy to access logs and worker info via squeue

    • Fast scheduling of resources

    • Detailed monitoring

    • Available to CERN/FNAL users

    Cons

    • Unavailable to CERN/FNAL users

    • Scheduling workers can be slow due to competition with CMS production jobs

    • Limited total amount of resources

    • Retreiving detailed worker info can be non-trivial for users (but easy for admins)

1. Creating Dask Gateway clusters

This section contains the instructions for creating Dask Gateway clusters using the methods described above.

  1. Click on the Dask logo in the left sidebar of JupyterLab interface.

  2. Click on [+ NEW] button to open the dialog window with cluster settings.

  3. In the dialog window, select cluster type, kernel, and desired worker resources.

  4. Click the [Apply] button and wait for ~1 min, the cluster info will appear in the sidebar.

  5. The sidebar should automatically connect to Dask dashboards; you can open different dashboards by clicking on yellow buttons in the sidebar, and rearrange the tabs as desired.

_images/dask-gateway.png

2. Shared environments and storage volumes

There are multiple ways to ensure that the workers have access to specific storage volumes, Conda environments, Python packages, C++ libraries, etc.

  • Shared storage

    Dask workers have the same permissions as the user that creates them. You can use this to your advantage if your workers read/write data to/from storage locations.

    Refer to the following table to decide which Dask Gateway setup works best in your case:

    SLURM workers

    (Purdue users)

    Kubernetes workers

    (Purdue users)

    Kubernetes workers

    (CERN/FNAL users)

    /home/

    no access

    no access

    no access

    /work/

    no access

    read / write

    read / write

    Depot

    read / write

    read / write

    read-only

    CVMFS

    read-only

    read-only

    read-only

    EOS

    read-only

    read-only

    read-only

  • Conda environments / Jupyter kernels

    Any Conda environment that is used in your analysis can be propagated to Dask workers. The only caveat is that the workers must have read access to the storage volume where the environment is stored (see table above). For example, SLURM workers will not be able to see Conda environments located in /work/ storage.

    The Conda environment / Jupyter kernel can be selected from a drop-down list in the dialog window that appears when you click on [+NEW] button.

    To make your Conda environment appear as a kernel, it must have the ipykernel package installed.

    _images/dask-gateway-dialog.png
  • Environment variables

    Passing environment variables to workers can be beneficial in various ways, for example:

    • Enable imports from local Python (sub)modules by amending the PYTHONPATH variable.

    • Enable imports from C++ libraries by amending the LD_LIBRARY_PATH variable.

    • Allow workers to read data via XRootD by specifying path to VOMS proxy via X509_USER_PROXY variable.

    These and other environment variables can be passed to Dask workers as follows:

    When a Dask Gateway cluster is created via the JupyterLab extension, there is no direct interface to pass environment to workers.

    Instead, we use the following workaround to override the worker environment:

    1. Create a file ~/.config/dask/labextension.yaml

    2. Add any environment variables in the following way:

      # contents of labextension.yaml
      labextension:
         env_override:
            KEY1: VALUE1
            X509_USER_PROXY: "/path-to-proxy/"
            # any other variables..
      
    3. Shut down and restart the Analysis Facility session

    4. Create a new cluster by clicking the [+NEW] button in the left sidebar.

3. Monitoring

Monitoring your Dask jobs is possible in two ways:

  1. Via Dask dashboard which is created for each cluster (see instructions below).

  2. Via the general Purdue AF monitoring page, in the “Slurm metrics” and “Dask metrics” sections of the monitoring dashboard .

Instructions to open Dask cluster dashboards for different Gateway setups:

When a cluster is created via the Dask Labextension interface, the extension should connect to monitoring dashboards automatically; you can open various dashboards by clicking on the yellow buttons in the sidebar.

Alternatively, you can copy the URL from the window at the top of the Labextension sidebar, and open the Dask dashboard in a separate web browser tab.

_images/dask-gateway.png

4. Cluster discovery and connecting a client

In general, connecting a client to a Gateway cluster is done as follows:

client = cluster.get_client()

However, this implies that cluster refers to an already existing object. This is true if the cluster was created in the same Notebook / Python script, but in most cases we recommend that the cluster is kept separate from the clients.

Below are the different ways to connect a client to a cluster created elsewhere:

This snippet allows to discover the cluster and connect to it automatically, as long as the cluster exists.

from dask_gateway import Gateway

# If submitting workers as SLURM jobs (Purdue users only):
gateway = Gateway()

# If submitting workers as Kubernetes pods (all users):
# gateway = Gateway(
#     "http://dask-gateway-k8s.geddes.rcac.purdue.edu/",
#     proxy_address="traefik-dask-gateway-k8s.cms.geddes.rcac.purdue.edu:8786",
# )

clusters = gateway.list_clusters()
# for example, select the first of existing clusters
cluster_name = clusters[0].name
cluster = gateway.connect(cluster_name).get_client()

Caution

If you have more than one Dask Gateway cluster running, automatic detection may be ambiguous.

5. Cluster lifetime and timeouts

  • Cluster creation will fail if the scheduler doesn’t start in 2 minutes. If this happens, try to resubmit the cluster.

  • Once created, Dask scheduler and workers will persist for 1 day.

  • If the notebook from which the Dask Gateway cluster was created is terminated, the cluster and all its workers will be killed after 1 hour.