-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathworkflow_status_listner.py
More file actions
71 lines (59 loc) · 2.29 KB
/
workflow_status_listner.py
File metadata and controls
71 lines (59 loc) · 2.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""
Workflow Status Listener Example
=================================
Demonstrates enabling external status listeners for workflow state changes.
What it does:
-------------
- Creates a workflow with HTTP task
- Enables a Kafka status listener
- Registers the workflow with listener configuration
- Status changes will be published to specified Kafka topic
Use Cases:
----------
- Real-time workflow monitoring via message queues
- Integrating workflows with external systems (Kafka, SQS, etc.)
- Building event-driven architectures
- Audit logging and compliance tracking
- Custom notifications on workflow state changes
- Analytics and metrics collection
Status Events Published:
------------------------
- Workflow started
- Workflow completed
- Workflow failed
- Workflow paused
- Workflow resumed
- Workflow terminated
- Task status changes
Key Concepts:
-------------
- Status Listener: External sink for workflow events
- enable_status_listener(): Configure where events are sent
- Kafka Integration: Publish events to Kafka topics
- Event-Driven Architecture: React to workflow state changes
- Workflow Registration: Persist workflow with listener config
Example Kafka Topic: kafka:<topic_name>
Example SQS Queue: sqs:<queue_url>
"""
import time
import uuid
from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models import StartWorkflowRequest, RerunWorkflowRequest, TaskResult
from conductor.client.orkes_clients import OrkesClients
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from conductor.client.workflow.task.http_task import HttpTask
from conductor.client.workflow.task.wait_task import WaitTask
def main():
api_config = Configuration()
clients = OrkesClients(configuration=api_config)
workflow = ConductorWorkflow(name='workflow_status_listener_demo', version=1,
executor=clients.get_workflow_executor())
workflow >> HttpTask(task_ref_name='http_ref', http_input={
'uri': 'https://orkes-api-tester.orkesconductor.com/api'
})
workflow.enable_status_listener('kafka:abcd')
workflow.register(overwrite=True)
print(f'Registered {workflow.name}')
if __name__ == '__main__':
main()