# Copyright 2024 Apheleia
#
# Description:
# Apheleia Verification Library Sequencer
from __future__ import annotations
import random
from typing import TYPE_CHECKING
from cocotb.triggers import Event
from .component import Component
from .port import Port
if TYPE_CHECKING:
from .sequence import Sequence
from .sequence_item import SequenceItem
[docs]
class Sequencer(Component):
[docs]
def __init__(self, name: str, parent: Component) -> None:
"""
Initialize the Sequencer instance.
:param name: Name of the sequencer.
:type name: str
:param parent: Parent component.
:type parent: Component
"""
super().__init__(name, parent)
self.seq_item_export = Port("seq_item_export", self)
# Locks
self._locks_ = []
self._lock_ev_ = Event()
self.current_lock = None
# Sequences
self._seqs_ = []
self._seq_ev_ = Event()
[docs]
def send_request(self, seq: Sequence, item: SequenceItem) -> None:
"""
Sends a request to the sequencer.
:param seq: The sequence sending the request.
:param item: The item to be sent.
"""
self.seq_item_export.write(item)
[docs]
def arbitrate(self) -> tuple[Sequence, Event, int]:
"""
Arbitrates between available sequences based on their priorities.
:returns: A tuple containing the selected sequence, its event, and its priority.
If no sequence is available, returns (None, None, None).
"""
ids = []
weights = []
for i in range(len(self._seqs_)):
if (self.current_lock) is None or (self._seqs_[i][0] == self.current_lock):
ids.append(i)
weights.append(self._seqs_[i][2])
if len(ids) == 0:
return (None, None, None)
else:
id = random.choices(ids, weights=weights, k=1)[0]
return self._seqs_.pop(id)
[docs]
async def lock(self, seq: Sequence) -> None:
"""
Locks the sequencer for the sequence.
:param seq: The sequence locking the sequencer.
"""
if seq in self._locks_:
raise Exception("Sequence already requested sequencer lock")
ev = Event()
self._locks_.append((seq, ev))
await ev.wait()
ev.clear()
self._lock_ev_.set()
[docs]
def unlock(self, seq: Sequence) -> None:
"""
Unlocks the sequencer for the sequence.
:param seq: The sequence unlocking the sequencer.
"""
if self.current_lock == seq:
self.current_lock = None
for s, ev in self._locks_:
if s == seq:
self._locks_.remove((s, ev))
break
[docs]
def get_lock(self) -> Sequence:
"""
Gets the current lock.
:returns: The current lock.
"""
return self.current_lock
[docs]
async def wait_for_grant(self, seq: Sequence, priority: int = None) -> None:
"""
Waits for a grant to run the sequence.
:param seq: The sequence requesting the grant.
:param priority: The priority of the sequence (optional).
"""
if priority is None:
priority = seq.get_priority()
ev = Event()
self._seqs_.append((seq, ev, priority))
self._seq_ev_.set()
await ev.wait()
ev.clear()
[docs]
async def run_phase(self) -> None:
"""
Runs the sequencer phase.
"""
while True:
# Wait until there is a sequence to run
await self._seq_ev_.wait()
self._seq_ev_.clear()
while True:
# Check locks
if (self.current_lock is None) and (len(self._locks_) > 0):
(self.current_lock, lock_ev) = self._locks_.pop(0)
lock_ev.set()
await self._lock_ev_.wait()
self._lock_ev_.clear()
# Arbitrate for the next sequence
(seq, ev, priority) = self.arbitrate()
if seq is not None:
ev.set()
else:
break
__all__ = ["Sequencer"]