To run a task on a cluster, you should use the docker-cluster profile. By using this profile, all the instances will be started and connected to the same network. In addition to that, a shared directory is created at /share, and it will contain the file qhost which contains, on each line, the IP address of the according instance.
When using this profile, the constant DOCKER_CMD is replaced by:
DOCKER_CMD_MASTER which is the command that will be run on the instance number 0.DOCKER_CMD_WORKER which is the command that will be run on all the other instances.Below are a few examples of scripts demonstrating how the common network can be used. They each add a layer of complexity on top of the previous one.
Script_ping.sh
#!/bin/bash
# Log function
LOG_PATH=/job/${INSTANCE_ID}.log
log_info() {
echo "$(date +"%Y-%m-%d %H:%M:%S") || '$*'" 2>&1 | tee -a "${LOG_PATH}"
}
# Wait for each instance to reach this step by creating a file in the /share
# directory and waiting for all the instances to have created its file
wait_for() {
mkdir -p "/share/$1"
touch "/share/$1/$INSTANCE_ID.ready"
while [ "$(find "/share/$1" -maxdepth 1 -type f -name '*.ready' | wc -l)" != "$INSTANCE_COUNT" ]; do
log_info "Not all instances are $1"
sleep 10
done
log_info "All instances are $1"
}
# Wait for /share/qhost to be ready
while [ ! -r /share/qhost ]; do
log_info "/share/qhost not ready, retry in 3 seconds..."
sleep 3
done
log_info "/share/qhost is available"
# Set IPs
readarray -t IPs < /share/qhost
# Waiting for all the instances to be up and ready
wait_for ready
# Ping others
log_info "INSTANCE ${INSTANCE_ID} pinging..."
for ip in "${IPs[@]}"; do
ping -c 1 "$ip" >> "${LOG_PATH}" 2>> "${LOG_PATH}"
EXIT_CODE="$?"
if [ $EXIT_CODE -ne 0 ]
then
break
fi
done
# Wait for all pings before exiting
wait_for "done"
# Exit with the proper status code
if [ $EXIT_CODE -eq 0 ]
then
log_info "All pings were successful on this instance"
exit 0
else
log_info "Some pings didn't succeed on this instance"
exit $EXIT_CODE
fi
Script_SSH.sh
#!/bin/bash
# Log function
LOG_PATH=/job/${INSTANCE_ID}.log
log_info() {
echo "$(date +"%Y-%m-%d %H:%M:%S") || '$*'" 2>&1 | tee -a "${LOG_PATH}"
}
# Wait for each instance to reach this step by creating a file in the /share
# directory and waiting for all the instances to have created its file
wait_for() {
mkdir -p "/share/$1"
touch "/share/$1/$INSTANCE_ID.ready"
while [ "$(find "/share/$1" -maxdepth 1 -type f -name '*.ready' | wc -l)" != "$INSTANCE_COUNT" ]; do
log_info "Not all instances are $1"
sleep 10
done
log_info "All instances are $1"
}
# Start ssh deamon
/usr/sbin/sshd -D &
mkdir /job/files
touch /job/files/"${INSTANCE_ID}".txt
# Wait for /share/qhost to be ready
while [ ! -r /share/qhost ]; do
log_info "/share/qhost not ready, retry in 3 seconds..."
sleep 3
done
log_info "/share/qhost is available"
readarray -t IPs < /share/qhost
# Waiting for all the instances to be up and ready
wait_for ready
log_info "${INSTANCE_ID} copying files"
mkdir -p /root/files
for ip in "${IPs[@]}"; do
scp -o StrictHostKeyChecking=no root@"${ip}":/job/files/* /root/files "${LOG_PATH}" 2>> "${LOG_PATH}"
EXIT_CODE="$?"
if [ $EXIT_CODE -ne 0 ]
then
break
fi
done
log_info "$(ls -R /root/files)"
if [ "$(ls /root/files | wc -l)" != "$INSTANCE_COUNT" ]
then
log_info "Couldn t copy all files, copied $(ls /root/files | wc -l)"
else
log_info "**** Execution successful ! ****"
fi
wait_for "done"
# Exit with the proper status code
if [ $EXIT_CODE -eq 0 ]
then
log_info "All the executions were successful"
kill $(cat /var/run/sshd.pid)
exit 0
else
log_info "Some scp didn t succeed on this instance"
exit $EXIT_CODE
fi
Script_mpi_hostname.sh
#!/bin/bash
# Log function
LOG_PATH=/job/${INSTANCE_ID}.log
log_info() {
echo "$(date +"%Y-%m-%d %H:%M:%S") || '$*'" 2>&1 | tee -a "${LOG_PATH}"
}
# Wait for each instance to reach this step by creating a file in the /share
# directory and waiting for all the instances to have created its file
wait_for() {
mkdir -p "/share/$1"
touch "/share/$1/$INSTANCE_ID.ready"
while [ "$(find "/share/$1" -maxdepth 1 -type f -name '*.ready' | wc -l)" != "$INSTANCE_COUNT" ]; do
log_info "Not all instances are $1"
sleep 10
done
log_info "All instances are $1"
}
# Start ssh deamon
/usr/sbin/sshd -D &
# Wait for /share/qhost to be ready
while [ ! -r /share/qhost ]; do
log_info "/share/qhost not ready, retry in 10 seconds..."
sleep 10
done
log_info "/share/qhost is available"
readarray -t IPs < /share/qhost
# Waiting for all the instances to be up and ready
wait_for ready
for ip in "${IPs[@]}"; do
ssh-keyscan -H "$ip" >> ~/.ssh/known_hosts 2>> "${LOG_PATH}"
done
# There are three keys per hosts so the following cmd tells the number of added known hosts
log_info "Number of known hosts $(( $(wc -l < ~/.ssh/known_hosts)/3 ))"
# Add
printf "%s slots=%s\n" "${IPs[${INSTANCE_ID}]}" "$(nproc)" >> /share/mpihosts
# Waiting for all instances to have added the ips to the known hosts
wait_for mpiready
if [ "$INSTANCE_ID" = "0" ]
then
cat /share/mpihosts
while read -r line; do
SLOTS=$((SLOTS + ${line#*=}))
done < /share/mpihosts
log_info "total slots: ${SLOTS}"
log_info "executing /usr/bin/mpirun --allow-run-as-root -np ${SLOTS} -hostfile /share/mpihosts hostname"
/usr/bin/mpirun --allow-run-as-root -np "${SLOTS}" -hostfile /share/mpihosts hostname
fi
EXIT_CODE="$?"
wait_for "done"
# Stopping ssh demon
kill "$(cat /var/run/sshd.pid)"
# Exit with the proper status code
if [ "$EXIT_CODE" -eq 0 ]
then
log_info "All the executions were successful"
exit 0
else
log_info "An error happened"
exit "$EXIT_CODE"
fi
script_ping.sh will make every instance ping each other and demonstrate how to retrieve the IPs.script_ssh.sh will make every instance set a passwordless ssh connection with every other and demonstrate its use by copying a file from every other instance.Note that, to communicate through ssh, each instance needs to have a ssh key pair. In this case, the Docker container was built containing a key pair, which insures every instance shares the same pair.
script_mpi_hostname.sh shows how to set up a host file for OpenMPI and start the standard hostname program through MPI. However, note that using MPI on a standard cluster may lead to poor results, since our standard network is not optimized for this use. If you need a high-performance cluster with low latency communication between nodes, you should get in touch with us at qlab@qarnot-computing.com, so that we can provide you access to the appropriate hardware setup.These three scripts are provided as parts of our example container, so running these examples does not need any data. To run them, simply install our Python SDK, copy the run.py script below, change <<<MY_SECRET_TOKEN>>> for your qarnot token and follow the output using Tasq. You will notice in the scripts that it is often useful to wait for the other instances to be ready for the next step. This is repeatedly done through the custom wait_for function.
Python
#!/usr/bin/env python3
import qarnot
conn = qarnot.Connection(client_token='<<<MY_SECRET_TOKEN>>>')
task = conn.create_task('cluster-example', 'docker-cluster', 3)
task.constants["DOCKER_REPO"] = "qarnotlab/cluster-example"
task.constants["DOCKER_TAG"] = "v1"
# May be changed to /opt/script_ssh.sh or /opt/script_mpi_hostname.sh
task.constants['DOCKER_CMD_MASTER'] = "/bin/bash /opt/script_ping.sh"
task.constants['DOCKER_CMD_WORKER'] = "/bin/bash /opt/script_ping.sh"
output_bucket = conn.create_bucket('cluster-example-output')
task.results = output_bucket
task.submit()
Bash
#!/bin/bash
# Your info
export QARNOT_CLIENT_TOKEN="<<<MY_SECRET_TOKEN>>>"
# Create a bucket for the ouput data
qarnot bucket create \
--name cluster-example-output
# Create and run task
qarnot task create \
--name cluster-example \
--profile docker-cluster \
--instance 3 \
--result cluster-example-output \
--constants \
"DOCKER_REPO=qarnotlab/cluster-example" \
"DOCKER_TAG=v1" \
"DOCKER_CMD_MASTER=/bin/bash /opt/script_ping.sh" \
"DOCKER_CMD_WORKER=/bin/bash /opt/script_ping.sh"A Qarnot cluster is composed of one Master node and N Worker nodes. As mentioned above, their behavior can be mainly controlled with the DOCKER_CMD_MASTER and DOCKER_CMD_WORKER constants. Below are a few characteristics specific to clusters:
DOCKER_REPO constant.DOCKER_CMD_WORKER command at run time. This can be further modified by the use of environment variables like $INSTANCE_ID./job directories./share directory is shared between all nodes with read/write access. It can be used to get all node IP addresses from the /share/qhost file, create beacon files to coordinate between nodes in a more complex and precise way and more. However it is not meant for high bandwidth data transfers.