Skip to content

Commit

Permalink
feat: add queue information details for devices and jobs (#123)
Browse files Browse the repository at this point in the history
* feat: add queue information

* add device mock for failing test

* update sdk constraint
  • Loading branch information
virajvchaudhari committed Oct 18, 2023
1 parent a30bedf commit 5953cce
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 5 deletions.
34 changes: 34 additions & 0 deletions qiskit_braket_provider/providers/braket_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Iterable, Union, List

from braket.aws import AwsDevice, AwsQuantumTaskBatch, AwsQuantumTask
from braket.aws.queue_information import QueueDepthInfo
from braket.circuits import Circuit
from braket.devices import LocalSimulator
from braket.tasks.local_quantum_task import LocalQuantumTask
Expand Down Expand Up @@ -214,6 +215,39 @@ def qubit_properties(
# TODO: fetch information from device.properties.provider # pylint: disable=fixme
raise NotImplementedError

def queue_depth(self) -> QueueDepthInfo:
"""
Task queue depth refers to the total number of quantum tasks currently waiting
to run on a particular device.
Returns:
QueueDepthInfo: Instance of the QueueDepth class representing queue depth
information for quantum jobs and hybrid jobs.
Queue depth refers to the number of quantum jobs and hybrid jobs queued on a particular
device. The normal tasks refers to the quantum jobs not submitted via Hybrid Jobs.
Whereas, the priority tasks refers to the total number of quantum jobs waiting to run
submitted through Amazon Braket Hybrid Jobs. These tasks run before the normal tasks.
If the queue depth for normal or priority quantum tasks is greater than 4000, we display
their respective queue depth as '>4000'. Similarly, for hybrid jobs if there are more
than 1000 jobs queued on a device, display the hybrid jobs queue depth as '>1000'.
Additionally, for QPUs if hybrid jobs queue depth is 0, we display information about
priority and count of the running hybrid job.
Example:
Queue depth information for a running hybrid job.
>>> device = AWSBraketProvider().get_backend("SV1")
>>> print(device.queue_depth())
QueueDepthInfo(quantum_tasks={<QueueType.NORMAL: 'Normal'>: '0',
<QueueType.PRIORITY: 'Priority'>: '1'}, jobs='0 (1 prioritized job(s) running)')
If more than 4000 quantum jobs queued on a device.
>>> device = AWSBraketProvider().get_backend("SV1")
>>> print(device.queue_depth())
QueueDepthInfo(quantum_tasks={<QueueType.NORMAL: 'Normal'>: '>4000',
<QueueType.PRIORITY: 'Priority'>: '2000'}, jobs='100')
"""
return self._device.queue_depth()

@property
def dtm(self) -> float:
raise NotImplementedError(
Expand Down
37 changes: 37 additions & 0 deletions qiskit_braket_provider/providers/braket_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from warnings import warn

from braket.aws import AwsQuantumTask
from braket.aws.queue_information import QuantumTaskQueueInfo
from braket.tasks import GateModelQuantumTaskResult
from braket.tasks.local_quantum_task import LocalQuantumTask
from qiskit.providers import BackendV2, JobStatus, JobV1
Expand Down Expand Up @@ -119,6 +120,42 @@ def shots(self) -> int:
def submit(self):
return

def queue_position(self) -> QuantumTaskQueueInfo:
"""
The queue position details for the quantum job.
Returns:
QuantumTaskQueueInfo: Instance of QuantumTaskQueueInfo class
representing the queue position information for the quantum job.
The queue_position is only returned when quantum job is not in
RUNNING/CANCELLING/TERMINAL states, else queue_position is returned as None.
The normal tasks refers to the quantum jobs not submitted via Hybrid Jobs.
Whereas, the priority tasks refers to the total number of quantum jobs waiting to run
submitted through Amazon Braket Hybrid Jobs. These tasks run before the normal tasks.
If the queue position for normal or priority quantum tasks is greater than 2000,
we display their respective queue position as '>2000'.
Note: We don't provide queue information for the LocalQuantumTasks.
Examples:
job status = QUEUED and queue position is 2050
>>> task.queue_position()
QuantumTaskQueueInfo(queue_type=<QueueType.NORMAL: 'Normal'>,
queue_position='>2000', message=None)
job status = COMPLETED
>>> task.queue_position()
QuantumTaskQueueInfo(queue_type=<QueueType.NORMAL: 'Normal'>,
queue_position=None, message='Task is in COMPLETED status. AmazonBraket does
not show queue position for this status.')
"""
for task in self._tasks:
if isinstance(task, LocalQuantumTask):
raise NotImplementedError(
"We don't provide queue information for the LocalQuantumTask."
)
return AwsQuantumTask(self.task_id()).queue_position()

def task_id(self) -> str:
"""Return a unique id identifying the task."""
return self._task_id
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
certifi>=2021.5.30
qiskit-aer>=0.10.2
qiskit-terra>=0.19.2
amazon-braket-sdk>=1.33.0
amazon-braket-sdk>=1.56.0
retrying==1.3.3

setuptools>=40.1.0
Expand Down
25 changes: 23 additions & 2 deletions tests/providers/test_braket_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import unittest
from typing import Dict, List
from unittest import TestCase
from unittest.mock import Mock
from unittest.mock import Mock, patch

from botocore import errorfactory
from braket.aws.queue_information import QueueDepthInfo, QueueType
from qiskit import QuantumCircuit, transpile, BasicAer

from qiskit.algorithms.minimum_eigensolvers import VQE, VQEResult
Expand All @@ -22,7 +23,10 @@
from qiskit_braket_provider import AWSBraketProvider, version
from qiskit_braket_provider.providers import AWSBraketBackend, BraketLocalBackend
from qiskit_braket_provider.providers.adapter import aws_device_to_target
from tests.providers.mocks import RIGETTI_MOCK_GATE_MODEL_QPU_CAPABILITIES
from tests.providers.mocks import (
RIGETTI_MOCK_GATE_MODEL_QPU_CAPABILITIES,
RIGETTI_MOCK_M_3_QPU_CAPABILITIES,
)


def combine_dicts(
Expand Down Expand Up @@ -266,6 +270,23 @@ def test_native_circuits_with_measurements_can_be_run_in_verbatim_mode(self):

self.assertEqual(sum(result.get_counts().values()), 10)

@patch("qiskit_braket_provider.providers.braket_provider.AwsDevice")
def test_queue_depth(self, mocked_device):
"""Tests queue depth."""

mock_return_value = QueueDepthInfo(
quantum_tasks={QueueType.NORMAL: "19", QueueType.PRIORITY: "3"},
jobs="0 (3 prioritized job(s) running)",
)
mocked_device.properties = RIGETTI_MOCK_M_3_QPU_CAPABILITIES
mocked_device.queue_depth.return_value = mock_return_value
backend = AWSBraketBackend(device=mocked_device)
result = backend.queue_depth()

mocked_device.queue_depth.assert_called_once()
assert isinstance(result, QueueDepthInfo)
self.assertEqual(result, mock_return_value)


class TestAWSBackendTarget(TestCase):
"""Tests target for AWS Braket backend."""
Expand Down
12 changes: 12 additions & 0 deletions tests/providers/test_braket_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ def test_AWS_result(self):
self.assertEqual(job.result().results[0].shots, 3)
self.assertEqual(job.result().get_memory(), ["10", "10", "01"])

def test_queue_position_for_local_quantum_task(self):
"""Tests job status when multiple task status is present."""
job = AWSBraketJob(
backend=BraketLocalBackend(name="default"),
job_id="MockId",
tasks=[MOCK_LOCAL_QUANTUM_TASK],
shots=100,
)
message = "We don't provide queue information for the LocalQuantumTask."
with pytest.raises(NotImplementedError, match=message):
job.queue_position()


class TestBracketJobStatus:
"""Tests for AWS Braket job status."""
Expand Down
36 changes: 34 additions & 2 deletions tests/providers/test_braket_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from unittest.mock import Mock, patch
import uuid

from braket.aws.queue_information import QuantumTaskQueueInfo, QueueType
from braket.circuits import Circuit
from braket.aws import AwsSession, AwsQuantumTaskBatch
from braket.aws import AwsDevice, AwsDeviceType
from qiskit import circuit as qiskit_circuit
from qiskit import circuit as qiskit_circuit, QuantumCircuit
from qiskit.compiler import transpile

from qiskit_braket_provider.providers import AWSBraketProvider
Expand Down Expand Up @@ -104,7 +105,6 @@ def test_qiskit_circuit_transpilation_run(
state_vector_backend = provider.get_backend(
"SV1", aws_session=self.mock_session
)

transpiled_circuit = transpile(
q_circuit, backend=state_vector_backend, seed_transpiler=42
)
Expand Down Expand Up @@ -144,3 +144,35 @@ def test_discontinous_qubit_indices_qiskit_transpilation(self, mock_get_devices)

result = transpile(circ, device)
self.assertTrue(result)

@patch("qiskit_braket_provider.providers.braket_backend.AWSBraketBackend.run")
@patch(
"qiskit_braket_provider.providers.braket_job.AmazonBraketTask.queue_position"
)
@patch("qiskit_braket_provider.providers.braket_provider.AwsDevice")
def test_queue_position_for_quantum_tasks(
self, mocked_device, mock_queue_position, mock_run
):
"""Tests queue position for quantum tasks."""

mock_return_value = QuantumTaskQueueInfo(
queue_type=QueueType.NORMAL, queue_position=">2000", message=None
)
mock_task = Mock()
mock_task.queue_position = mock_queue_position
mock_queue_position.return_value = mock_return_value
mock_run.return_value = mock_task

mocked_device.properties = RIGETTI_MOCK_M_3_QPU_CAPABILITIES
device = AWSBraketBackend(device=mocked_device)
circuit = QuantumCircuit(3)
circuit.h(0)
circuit.cx(0, 1)
circuit.cx(0, 2)

qpu_task = device.run(circuit, shots=1)
result = qpu_task.queue_position()

mock_queue_position.assert_called_once()
assert isinstance(result, QuantumTaskQueueInfo)
self.assertEqual(result, mock_return_value)

0 comments on commit 5953cce

Please sign in to comment.