Generation

code to explanationTue, 21 Feb 2023

data_directory = os.getenv("DATA_DIRECTORY") nextflow_status_file = os.path.join(data_directory, "nextflow_status.json") nextflow_status = {} NEXTFLOW_STATUS_LOCK.acquire() if os.path.exists(nextflow_status_file): with open(nextflow_status_file, "r") as f: nextflow_status = json.load(f) s: dict = await request.json() event = s["event"] experiment_dir = None if event != "started" and event != "completed": event = "processing" else: experiment_dir = os.path.basename(s["metadata"]["workflow"]["launchDir"]) run_id = s["runId"] run_status = nextflow_status.get(run_id, dict()) run_status["status"] = event run_status["last_update"] = s["utcTime"] if experiment_dir is not None: run_status["experiment_directory"] = experiment_dir nextflow_status[run_id] = run_status with open(nextflow_status_file, "w") as f: f.write(json.dumps(nextflow_status, indent=4)) NEXTFLOW_STATUS_LOCK.release()

Questions about programming?Chat with your personal AI assistant