Skip to content

Commit

Permalink
feat: add parallel logic
Browse files Browse the repository at this point in the history
  • Loading branch information
orangekame3 committed Jan 22, 2025
1 parent de6c4b5 commit c62b7e5
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 11 deletions.
9 changes: 9 additions & 0 deletions backend/qcflow/deployment/oqtopus/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from prefect import serve
from prefect.client.schemas.schedules import CronSchedule
from qcflow.main import main_flow

# from qcflow.subflow.concurrent.flow import concurrent_flow
from qcflow.subflow.one_qubit_daily_summary.flow import one_qubit_daily_summary_flow
from qcflow.subflow.qubex.flow import cal_flow
from qcflow.subflow.scheduler.flow import scheduler_flow
from qcflow.subflow.service_close.service_close import (
qpu_close_flow,
Expand Down Expand Up @@ -122,6 +125,11 @@
),
is_schedule_active=True,
)
cal_flow_deploy = cal_flow.to_deployment(
name=f"{deployment_name}-cal-flow",
description="""This is a cal flow.
""",
)

_ = serve(
main_deploy, # type: ignore
Expand All @@ -131,6 +139,7 @@
simulator_open_deploy, # type: ignore
simulator_close_deploy, # type: ignore
one_qubit_daily_summary_deploy, # type: ignore
cal_flow_deploy, # type: ignore
webserver=True,
limit=50,
)
81 changes: 75 additions & 6 deletions backend/qcflow/subflow/qubex/flow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import asyncio

from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.task_runners import SequentialTaskRunner
from qcflow.schema.menu import Menu
from qcflow.subflow.qubex.task import (
Expand All @@ -12,23 +15,24 @@


@flow(
name="qubex-flow",
task_runner=SequentialTaskRunner(),
log_prints=True,
flow_run_name="{execution_id}",
flow_run_name="{sub_index}-{qubits}",
)
def qubex_flow(
def cal_flow(
menu: Menu,
calib_dir: str,
successMap: dict[str, bool],
execution_id: str,
qubits: list[int],
sub_index: int = 0,
) -> dict[str, bool]:
"""deployment to run calibration flow for a single qubit"""
logger = get_run_logger()
logger.info(f"Menu name: {menu.name}")
logger.info(f"Qubex version: {get_package_version('qubex')}")
logger.info(f"Qubits: {qubits}")
exp = Experiment(
chip_id="64Q",
qubits=[5, 7],
qubits=qubits,
config_dir="/home/shared/config",
)
exp.note.clear()
Expand All @@ -41,6 +45,7 @@ def qubex_flow(
qubex_version=get_package_version("qubex"),
fridge_temperature=0.0,
chip_id="SAMPLE",
sub_index=sub_index,
)
prev_result = TaskResult(
name="dummy", upstream_task="", status=TaskStatus.SCHEDULED, message=""
Expand Down Expand Up @@ -69,3 +74,67 @@ def qubex_flow(
logger.info("Ending all processes")
execution_manager.end_execution()
return successMap


async def trigger_cal_flow(
menu: Menu,
calib_dir: str,
successMap: dict[str, bool],
execution_id: str,
qubits: list[list[int]],
):
"""Trigger calibration flow for all qubits"""
deployments = []
for index, qubit in enumerate(qubits):
parameters = {
"menu": menu.model_dump(),
"calib_dir": calib_dir,
"successMap": successMap,
"execution_id": execution_id,
"qubits": qubit,
"sub_index": index,
}
deployments.append(run_deployment("cal-flow/oqtopus-cal-flow", parameters=parameters))

results = await asyncio.gather(*deployments)

print("All Qubits Completed:")
for result in results:
print(result)


def organize_qubits(qubits: list[list[int]], parallel: bool) -> list[list[int]]:
if parallel:
return qubits
else:
# Flatten the list into a single list and return as a single sublist
merged_qubits = [q for sublist in qubits for q in sublist]
return [merged_qubits]


@flow(
name="qubex-flow",
task_runner=SequentialTaskRunner(),
log_prints=True,
flow_run_name="{execution_id}",
)
async def qubex_flow(
menu: Menu,
calib_dir: str,
successMap: dict[str, bool],
execution_id: str,
) -> dict[str, bool]:
logger = get_run_logger()
logger.info(f"Menu name: {menu.name}")
logger.info(f"Qubex version: {get_package_version('qubex')}")
parallel = True
if parallel:
logger.info("parallel is True")
qubits = organize_qubits(menu.one_qubit_calib_plan, parallel)
await trigger_cal_flow(menu, calib_dir, successMap, execution_id, qubits)
return successMap
else:
qubits = organize_qubits(menu.one_qubit_calib_plan, parallel)
logger.info("parallel is False")
await trigger_cal_flow(menu, calib_dir, successMap, execution_id, qubits)
return successMap
44 changes: 43 additions & 1 deletion backend/qcflow/subflow/qubex/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ class ExecutionStatus(str, Enum):
# status: str = Field(..., description="Overall status of the process")


class CalibResult(BaseModel):
qubit_data: dict[str, dict[str, float | int]]

def put_result(self, qubit_id: str, key: str, value: float | int):
"""
add a result to the qubit data.
"""
if qubit_id not in self.qubit_data:
self.qubit_data[qubit_id] = {}
self.qubit_data[qubit_id][key] = value

def get_result(self, qubit_id: str, key: str) -> float | int | None:
"""
get a result from the qubit data.
"""
return self.qubit_data.get(qubit_id, {}).get(key)


class TaskResult(BaseModel):
name: str
upstream_task: str
Expand Down Expand Up @@ -109,6 +127,8 @@ class ExecutionManager(BaseModel):
start_at: str = ""
end_at: str = ""
elapsed_time: str = ""
qubit_results: CalibResult = CalibResult(qubit_data={})
sub_index: int = 0

def __init__(
self,
Expand Down Expand Up @@ -285,7 +305,7 @@ def save(self):
"""
Save the task manager to a file.
"""
save_path = f"{self.calib_data_path}/calib_data.json"
save_path = f"{self.calib_data_path}/calib_data_{self.sub_index}.json"
with open(save_path, "w") as f:
f.write(json.dumps(self.model_dump(), indent=4))

Expand Down Expand Up @@ -411,3 +431,25 @@ def end_execution(self) -> None:
# save_path = f"{self.calib_data_path}/{task_name}_history.json"
# with open(save_path, "w") as f:
# f.write(json.dumps(self.export_task_history(task_name).model_dump(), indent=4))

def put_calibration_value(self, qubit_id: str, key: str, value: float):
"""
add a calibration value to the qubit data.
"""
self.qubit_results.put_result(qubit_id, key, value)
self.save_qubit_data(qubit_id)

def get_calibration_value(self, qubit_id: str, key: str):
"""
get a calibration value from the qubit data.
"""
return self.qubit_results.get_result(qubit_id, key)

def save_qubit_data(self, qubit_id: str):
"""
save the qubit data to a file.
"""
save_path = f"{self.calib_data_path}/{qubit_id}.json"
with open(save_path, "w") as f:
json.dump({qubit_id: self.qubit_results.qubit_data[qubit_id]}, f, indent=4)
print(f"Real-time data saved for qubit {qubit_id}")
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

class ReadoutClassification(BaseTask):
task_name: str = "ReadoutClassification"
output_parameters: dict = {"readout_fidelity": {}}
output_parameters: dict = {"average_readout_fidelity": {}}

def __init__(self):
pass

def execute(self, exp: Experiment, execution_manager: ExecutionManager):
readout_result = exp.build_classifier()
exp.save_defaults()
self.output_parameters["readout_fidelity"] = readout_result["average_fidelity"]
self.output_parameters["average_readout_fidelity"] = readout_result[
"average_readout_fidelity"
]
execution_manager.put_output_parameters(self.task_name, self.output_parameters)
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,9 @@ def execute(self, exp: Experiment, execution_manager: ExecutionManager):
"effective_freq"
]
execution_manager.put_output_parameters(self.task_name, self.output_parameters)
for qubit in effective_control_frequency_result["effective_freq"]: # type: ignore
execution_manager.put_calibration_value(
qubit,
"effective_qubit_frequency",
effective_control_frequency_result["effective_freq"][qubit], # type: ignore
)
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ def execute(self, exp: Experiment, execution_manager: ExecutionManager):
exp.save_defaults()
self.output_parameters["qubit_frequency"] = qubit_frequency
execution_manager.put_output_parameters(self.task_name, self.output_parameters)
for qubit in qubit_frequency:
execution_manager.put_calibration_value(
qubit, "qubit_frequency", qubit_frequency[qubit]
)
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ def execute(self, exp: Experiment, execution_manager: ExecutionManager):
exp.save_defaults()
self.output_parameters["readout_frequency"] = readout_frequency
execution_manager.put_output_parameters(self.task_name, self.output_parameters)
for qubit in readout_frequency:
execution_manager.put_calibration_value(
qubit, "readout_frequency", readout_frequency[qubit]
)
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ def execute(self, exp: Experiment, execution_manager: ExecutionManager):
t1_values[qubit] = t1_result.data[qubit].t1 if qubit in t1_result.data else None
self.output_parameters["t1"] = t1_values
execution_manager.put_output_parameters(self.task_name, self.output_parameters)
for qubit in t1_values:
execution_manager.put_calibration_value(qubit, "t1", t1_values[qubit])
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ def execute(self, exp: Experiment, execution_manager: ExecutionManager):
t2_values[qubit] = t2_result.data[qubit].t2 if qubit in t2_result.data else None
self.output_parameters["t2"] = t2_values
execution_manager.put_output_parameters(self.task_name, self.output_parameters)
for qubit in t2_values:
execution_manager.put_calibration_value(qubit, "t2", t2_values[qubit])
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@ def execute(self, exp: Experiment, execution_manager: ExecutionManager):
)
self.output_parameters["hpi_amplitude"] = hpi_amplitudes
execution_manager.put_output_parameters(self.task_name, self.output_parameters)
for qubit in hpi_amplitudes:
execution_manager.put_calibration_value(qubit, "hpi_amplitude", hpi_amplitudes[qubit])
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,5 @@ def execute(self, exp: Experiment, execution_manager: ExecutionManager):
)
self.output_parameters["pi_amplitude"] = pi_amplitudes
execution_manager.put_output_parameters(self.task_name, self.output_parameters)
for qubit in pi_amplitudes:
execution_manager.put_calibration_value(qubit, "pi_amplitude", pi_amplitudes[qubit])
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class CreateDRAGHPIPulse(BaseTask):
task_name: str = "CreateDRAGHPIPulse"
output_parameters: dict = {"drag_pi_coeff": {}, "drag_hpi_amplitude": {}}
output_parameters: dict = {"drag_hpi_coeff": {}, "drag_hpi_amplitude": {}}

def __init__(
self,
Expand Down Expand Up @@ -51,5 +51,12 @@ def execute(self, exp: Experiment, execution_manager: ExecutionManager):
)
exp.save_defaults()
self.output_parameters["drag_hpi_amplitude"] = drag_hpi_result["amplitude"]
self.output_parameters["drag_pi_coeff"] = drag_hpi_result["beta"]
self.output_parameters["drag_hpi_coeff"] = drag_hpi_result["beta"]
execution_manager.put_output_parameters(self.task_name, self.output_parameters)
for qubit in drag_hpi_result["amplitude"]:
execution_manager.put_calibration_value(
qubit, "drag_hpi_amplitude", drag_hpi_result["amplitude"][qubit]
)
execution_manager.put_calibration_value(
qubit, "drag_hpi_coeff", drag_hpi_result["beta"][qubit]
)
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ def execute(self, exp: Experiment, execution_manager: ExecutionManager):
self.output_parameters["drag_pi_amplitude"] = drag_pi_result["amplitude"]
self.output_parameters["drag_pi_coeff"] = drag_pi_result["beta"]
execution_manager.put_output_parameters(self.task_name, self.output_parameters)
for qubit in drag_pi_result["amplitude"]:
execution_manager.put_calibration_value(
qubit, "drag_pi_amplitude", drag_pi_result["amplitude"][qubit]
)
execution_manager.put_calibration_value(
qubit, "drag_pi_coeff", drag_pi_result["beta"][qubit]
)
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ const presets = {
"CheckT1",
"CheckT2",
"CheckEffectiveQubitFrequency",
"CreateDRAGHPIPulse",
"CheckDRAGHPIPulse",
"CreateDRAGPIPulse",
"CheckDRAGPIPulse",
"ReadoutClassification",
"RandomizedBenchmarking",
],
preset1: ["Example1", "Example2", "Example3"],
preset2: ["Example1", "Example2", "Example3", "Example4"],
Expand Down

0 comments on commit c62b7e5

Please sign in to comment.