PySideのThreadPoolの使い方

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# -*- coding: utf-8 -*-

title = "PySideのThreadPoolの使い方"
tags = ["PySide", "Python"]

import sys
import os.path
import time

from PySide2 import QtCore, QtGui, QtWidgets


class WorkerSignal(QtCore.QObject):

    finished = QtCore.Signal(object)

    def __init__(self):
        super().__init__()


class Worker(QtCore.QRunnable):

    def __init__(self, threadId):
        super().__init__()

        self.threadId = threadId
        self.signals = WorkerSignal()

    def run(self):

        for i in range(10):
            print(f"{self.threadId} -> {str(i)}\n")
            print(QtCore.QThread.currentThread())
            time.sleep(0.1)

        self.signals.finished.emit(self)


class Pool():

    def __init__(self):

        self.workers = {}

        self.pool = QtCore.QThreadPool()
        self.pool.setMaxThreadCount(5)

    def finishTask(self, worker):

        print(f'finish task -> {worker.threadId}')
        self.workers.pop(worker.threadId)

    def startWorkers(self, count):

        for i in range(count):
            self.workers[i] = Worker(i)
            self.workers[i].signals.finished.connect(self.finishTask)
            self.workers[i].setAutoDelete(True)
            self.pool.start(self.workers[i])

        # while self.pool.activeThreadCount() > 0:  # waitForDone と同じ。
        #     time.sleep(1)


if __name__ == '__main__':
    app = QtWidgets.QApplication(sys.argv)

    pool = Pool()
    pool.startWorkers(10)

    sys.exit(app.exec_())

最終更新日: 2021-12-03 00:04:51

Tag
Back to top