Python - очередь с приоритетом
В статье разбираем практический случай создания очереди с сортировкой по приоритету.
Простая очередь.
Самый просто пример использования очереди - это поэтапная обработка ссылок на скачиваение файлов. Для этого достаточно воспользоваться простейшим списком list
куда можно закидывать любой объект c помощью метода .append()
. В действительности list
не является настоящей очередью (но сейчас это не важно). Получить объект содержащийся в такой “очереди” так же просто, для этого есть метод .pop()
. При этом стоит помнить, что метод .pop()
этот элемент удаляет навсегда.
queue = [] item1 = "https://www.example.com/001.zip" item2 = "https://www.example.com/002.zip" item3 = "https://www.example.com/003.zip" # Закинем в очередь объекты queue.append(item1) queue.append(item2) queue.append(item3) # Получаем из очереди объекты queue.pop() # Выведет: queue.pop() # "https://www.example.com/003.zip" queue.pop() # "https://www.example.com/002.zip" queue.pop() # "https://www.example.com/001.zip"
Вроде всё просто, но в данном случае простота не значит гибкость. Вот основные проблемы:
- нет автоматической сортировки, элементы из списка можно получить с конца в начало (если вызывать
.pop(0)
- с начала в конец); - при применении метода
.pop()
к списку не содержащему элементов возникает исключениеIndexError
которое нужно обработать. - нет гарантии что вы работаете однозначно с одним и тем же объектом
queue
Реализация очереди с приоритетом.
Для избежания недостатков простого list
предлагаю вооружится встроенным модулем heapq
(куча) и паттерном Синглтон.
Почему и зачем использовать модуль heapq
? Всё просто, этот модуль может просто и быстро отсортировать объекты по приоритету.
Паттерн Синглтон здесь будет использоваться для гарантированной работы только с одним экземпляром объекта Queue
.
Весь код класса Queue
приведён ниже:
import heapq class Queue: def __new__(cls): if not hasattr(cls, 'instance'): cls.instance = super(Queue, cls).__new__(cls) return cls.instance def __init__(self): self.queue = [] heapq.heapify(self.queue) self.index = 0 def __str__(self): return f"Last index element in queue: {self.index}." def __repr__(self): return f"Queue instance contains {self.queue.__len__()} elements." def push(self, item, priority=False): priority_level = 1 if priority: priority_level = -1 entry = (priority_level, self.index, item) heapq.heappush(self.queue, entry) self.index = self.index + 1 def pull(self): entry = heapq.heappop(self.queue) return entry[2] # Использование: scheduler = Queue() item1 = "https://www.example.com/001.zip" item2 = "https://www.example.com/002.zip" item3 = "https://www.example.com/003.zip" scheduler.push(item1) scheduler.push(item2, priority=True) scheduler.push(item3) # Вывод: str(scheduler) # Last index element in queue: 2. repr(scheduler) # Queue instance contains 3 elements. scheduler.pull() # "https://www.example.com/002.zip" scheduler.pull() # "https://www.example.com/001.zip" scheduler.pull() # "https://www.example.com/003.zip"
А теперь подробнее разберём код. Для начала импортируем необходимый модуль heapq
:
import heapq
Дальше создадим класс Queue
и добавим в него конструктор __new__
и инициализатор __init__
:
class Queue: def __new__(cls): if not hasattr(cls, 'instance'): cls.instance = super().__new__(cls) return cls.instance
Здесь я подробнее остановлюсь на конструкторе __new__
(это и будет наша реализация паттерна Синглтон):
Для дальнейшего создания любого количества экземпляров класса Queue
которые будут гарантировано ссылаться только на один объект Queue
в памяти - можно воспользоваться “волшебным” методом __new__
который инициализируется в самом начале создания экземпляра класса.
- С помощью метода
hasattr()
сделаем проверку на отсутствие аттрибутаinstance
у вновь создаваемого экземпляра классаQueue
- Если
instance
нет - создадим аттрибутinstance
и присвоим ему результат выполненияsuper().__new__(cls)
. - Если
instance
в экземпляре есть - просто вернём содержимоеinstance
- Магия тут кроется в том что результатом первого вызова
super().__new__(cls)
будет создан экземпляр классаQueue
, а все последующие попытки создать экземпляры классаQueue
вынужденно будут возвращать результат первого вызоваsuper()__new__(cls)
.
def __init__(self): self.queue = [] heapq.heapify(self.queue) self.index = 0
В котором мы создадим объект self.queue
- очередь с пустым списком list
. Для того чтобы превратить его в “кучу” heap
просто передадим self.queue
в качестве параметра в функцию heapify()
. Далее создадим объект с информацией о начальном индексе self.index
. По умолчанию индекс = 0.
Следующие методы __str__
и __repr__
имеют практическую ценность в случае отладки нашего кода (но они не обязательны):
def __str__(self): return f"Last index element in queue: {self.index - 1}." def __repr__(self): return f"Queue instance contains {self.queue.__len__()} elements."
Метод __str__
будет выводить индекс присвоенный объекту в нашей очереди, который был добавлен последним. Метод __repr__
будет информировать об общем количестве элементов в очереди.
Метод push()
отвечает за добавление объекта в очередь:
def push(self, item: Any, priority=False): priority_level = 1 if priority: priority_level = -1 entry = (priority_level, self.index, item) heapq.heappush(self.queue, entry) self.index = self.index + 1
entry
это кортеж с тремя элементами: приоритет, индекс, и сам объект. У каждого объекта помещаемого в очередь задается приоритет. Перед каждым помещением объекта в очередь приоритет сбрасывается на 1. Если priority=True
тогда объект в очередь помещается с приоритетом = -1. Теперь этот элемент будет в самом начале очереди, а значит наш метод pull()
выдаст его первым. Происходит это потому, что heapq
сортирует очередь исходя из “веса” элемента, с учётом его расположения. В случае наличия в очереди нескольких элементов с одинаковым приоритетом, сортировка будет осуществлятся по “весу” второго элемента - индекса. После добавления объекта в очередь, увеличиваем индекс на 1 пункт.
Метод pull()
отвечает за возврат объекта из очереди, с учётом приоритета:
def pull(self): entry = heapq.heappop(self.queue) return entry[2]
Метод .heappop()
берёт из очереди объект у которого самое нижнее значение приоритета и номер индекса и выдаёт нам только последний (третий) элемент. Не забываем что нумерация идёт от 0.
Реализация очереди поддерживающей работу с потоками.
Код который мы написали выше не подходит для работы с множеством потоков (используя модуль threading
). В случае использования потоков, где один пишет сообщение в очередь, а второй забирает это сообщение из очереди и обрабатывает его, может возникнуть блокировка очереди для одного из потоков, которая может привести к возникновению исключения. Для исправления проблемы понадобиться заменить модуль heapq
на queue
и изменить несколько строк в нашем классе Queue
.
- удаляем импорт модуля
heapq
и добавляем импорт классаPriorityQueue
из модуляqueue
self.queue = []
меняем наself.queue = PriorityQueue()
, для создание экземпляра классаPriorityQueue
- удаляем строчку
heapq.heapify(self.queue)
- в методе
__repr__()
меняемself.queue.__len__()
наself.queue._qsize()
heapq.heappush(self.queue, entry)
меняем наself.queue.put(entry)
- в методе
.pull()
строчкуentry = heapq.heappop(self.queue)
меняем на
if self.queue._qsize() == 0: return None else: entry = self.queue.get()
Пояснение: в случае если количество элементов в очереди будет равно 0 self.queue.get()
будет выполнятся вечно, т.к. будет ожидать элемент из очереди. Для предотвращения такого поведения делаем проверку.
Итак, весь код выглядит вот так:
from queue import PriorityQueue class Queue: def __new__(cls): if not hasattr(cls, 'instance'): cls.instance = super().__new__(cls) return cls.instance def __init__(self): self.queue = PriorityQueue() self.index = 0 def __str__(self): return f"Last index element in queue: {self.index}" def __repr__(self): return f"Queue instance contains {self.queue._qsize()} elements." def push(self, item, priority=False): priority_level = 1 if priority: priority.level = -1 entry = (priority_level, self.index, item) self.queue.put(entry) self.index = self.index + 1 def pull(self): if self.queue._qsize() == 0: return None else: entry = self.queue.get() return entry[2]
Таким образом наш класс Queue
теперь стал поддерживать работу с потоками.
from threading import Thread scheduler = Queue() def generate_links(item): for i in range(item): scheduler.push(i) # добавим одну приоритетную scheduler.push(item+1, priority=True) def print_links(): n=0 while n is not None: n=scheduler.pull() print(f"Item in scheduler = {n}") first_thread = Thread(target=generate_links, name='Generator', args=(5,)) seconf_thread = Thread(target=print_links, name='Printer') first_thread.start() str(scheduler) # Last index element in queue: 5. repr(scheduler) # Queue instance contains 6 elements. seconf_thread.start() # Вывод: # Item in schedule = 6 # Item in schedule = 0 # Item in schedule = 1 # Item in schedule = 2 # Item in schedule = 3 # Item in schedule = 4 # Item in schedule = None ## None - очередь пуста