Skip to content

Developer Reference

pfdl_scheduler

api

observer_api

This class contains classes to enable the Observer pattern.

The NotificationTye class is an enum class which is used for setting the type of the notification.

The abstract Observer class represents the observers in the pattern and requires an update method which is called by a Subject object. Here, the type of the notification and the corresponding data is required.

The abstract Subject class represents the subjects in the observer pattern. It provides methods to attach or detach observers and to notify them.

NotificationType

Bases: Enum

Declares the type of Notification in the observer pattern.

Observer

Bases: ABC

The Observer interface declares the update method, used by subjects.

update(notification_type, data) abstractmethod

Receive update from subject.

Source code in pfdl_scheduler/api/observer_api.py
36
37
38
39
@abstractmethod
def update(self, notification_type: NotificationType, data: Any) -> None:
    """Receive update from subject."""
    pass
Subject

Bases: ABC

The Subject interface declares a set of methods for managing subscribers.

attach(observer) abstractmethod

Attach an observer to the subject.

Source code in pfdl_scheduler/api/observer_api.py
45
46
47
48
@abstractmethod
def attach(self, observer: Observer) -> None:
    """Attach an observer to the subject."""
    pass
detach(observer) abstractmethod

Detach an observer from the subject.

Source code in pfdl_scheduler/api/observer_api.py
50
51
52
53
@abstractmethod
def detach(self, observer: Observer) -> None:
    """Detach an observer from the subject."""
    pass
notify(notification_type, data) abstractmethod

Notify all observers about an event.

Source code in pfdl_scheduler/api/observer_api.py
55
56
57
58
@abstractmethod
def notify(self, notification_type: NotificationType, data: Any) -> None:
    """Notify all observers about an event."""
    pass

service_api

Contains the ServiceAPI class.

ServiceAPI(service, task_context, uuid='', in_loop=False) dataclass

Represents a called Service. Represents a Service or Service Call in the langauge which can be mapped to a real service that can be executed. Attributes: service: A description of the called Service. task_context: A TaskAPI representaiton of the Task from which the service was called. uuid: A UUID4 which is generated at object creation and is used in the scheduling. in_loop: A boolean indicating whether the Service was called within a loop.

Initialize the object. Args: service: A description of the called Service. task_context: A TaskAPI representaiton of the Task from which the service was called. uuid: A UUID4 which is generated at object creation and is used in the scheduling. in_loop: A boolean indicating whether the Service was called within a loop.

Source code in pfdl_scheduler/api/service_api.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    service: Service,
    task_context: TaskAPI,
    uuid: str = "",
    in_loop: bool = False,
) -> None:
    """Initialize the object.
    Args:
        service: A description of the called Service.
        task_context: A TaskAPI representaiton of the Task from which the service was called.
        uuid: A UUID4 which is generated at object creation and is used in the scheduling.
        in_loop: A boolean indicating whether the Service was called within a loop.
    """
    if uuid == "":
        self.uuid: str = str(uuid4())
    else:
        self.uuid: str = uuid
    self.in_loop: bool = in_loop
    self.service: Service = service
    self.task_context: TaskAPI = task_context
    self.input_parameters: List[Union[str, List[str], Struct]] = copy.deepcopy(
        service.input_parameters
    )

task_api

Contains the TaskAPI class.

TaskAPI(task, task_context, uuid='', task_call=None, in_loop=False) dataclass

Represents a called Task.

Represents a specific entity of a called Task. It combines the information of the Task itself and the parameters of the call. A TaskAPI object is used in the task started and finished callback which are called by the scheduler.

Attributes:

Name Type Description
task Task

A description of the called Task.

task_context TaskAPI

A TaskAPI representaiton of the Task from which the called task was invoked.

uuid str

A UUID4 which is generated at object creation and is used in the scheduling.

task_call TaskCall

Information about the input and output parameters of the called Task.

in_loop bool

A boolean indicating whether the Task was called within a loop.

Initialize the object.

Parameters:

Name Type Description Default
task Task

A description of the called Task.

required
task_context TaskAPI

A TaskAPI representaiton of the Task from which the called task was invoked.

required
uuid str

A UUID4 which is generated at object creation and is used in the scheduling.

''
task_call TaskCall

Information about the input and output parameters of the called Task.

None
in_loop bool

A boolean indicating whether the Task was called within a loop.

False
Source code in pfdl_scheduler/api/task_api.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __init__(
    self,
    task: Task,
    task_context: "TaskAPI",
    uuid: str = "",
    task_call: TaskCall = None,
    in_loop: bool = False,
) -> None:
    """Initialize the object.

    Args:
        task: A description of the called Task.
        task_context: A TaskAPI representaiton of the Task from which the called task was invoked.
        uuid: A UUID4 which is generated at object creation and is used in the scheduling.
        task_call: Information about the input and output parameters of the called Task.
        in_loop: A boolean indicating whether the Task was called within a loop.
    """
    if uuid == "":
        self.uuid: str = str(uuid4())
    else:
        self.uuid: str = uuid
    self.task: Task = task
    self.task_context: TaskAPI = task_context
    self.task_call: TaskCall = task_call

    if task_call:
        self.input_parameters: List[Union[str, List[str], Struct]] = copy.deepcopy(
            task_call.input_parameters
        )
    else:
        self.input_parameters: List[Union[str, List[str], Struct]] = []
    self.in_loop: bool = in_loop

extension

Contains the start up script used in the VSCode extension.

A program that shall be executed in the VS Code extension which has a string containing a PFDL program as input as well as the name of the corresponding file.

model

array

Contains the Array class.

Array(type_of_elements='', values=None, context=None)

Represents an Array in the PFDL.

Used as as an array definition or a returned array with elements in it.

Attributes:

Name Type Description
type_of_elements str

A string representing the type of the elements inside the array.

values List[Any]

A list of elements of the Array (empty if it is a array definition).

length int

An integer for the length of the Array. If it is not defined it gets the value -1.

context ParserRuleContext

ANTLR context object of this class.

Initialize the object.

Parameters:

Name Type Description Default
type_of_elements str

A string representing the type of the elements inside the array.

''
values List[Any]

A list of elements of the Array (empty if it is a array definition).

None
context ParserRuleContext

ANTLR context object of this class.

None
Source code in pfdl_scheduler/model/array.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    type_of_elements: str = "",
    values: List[Any] = None,
    context: ParserRuleContext = None,
) -> None:
    """Initialize the object.

    Args:
        type_of_elements: A string representing the type of the elements inside the array.
        values: A list of elements of the Array (empty if it is a array definition).
        context: ANTLR context object of this class.
    """
    self.type_of_elements: str = type_of_elements
    if values:
        self.length: int = len(values)
        self.values: List[Any] = values
    else:
        self.values: List[Any] = []
        self.length: int = -1
    self.context: ParserRuleContext = context
append_value(value)

Adds an element to the array and increase the length.

Parameters:

Name Type Description Default
value Any

The value that should be added to the array.

required
Source code in pfdl_scheduler/model/array.py
86
87
88
89
90
91
92
93
94
95
def append_value(self, value: Any) -> None:
    """Adds an element to the array and increase the length.

    Args:
        value: The value that should be added to the array.
    """
    if self.length == -1:  # Set length to 0 for arrays with undefined length
        self.length = 0
    self.values.append(value)
    self.length = self.length + 1
length_defined()

Returns whether the lenght of the array is defined.

Returns:

Type Description
bool

True if the length of the array is defined.

Source code in pfdl_scheduler/model/array.py
 97
 98
 99
100
101
102
103
def length_defined(self) -> bool:
    """Returns whether the lenght of the array is defined.

    Returns:
        True if the length of the array is defined.
    """
    return self.length != -1

condition

Contains the Condition class.

Condition(expression=None, passed_stmts=None, failed_stmts=None, context=None) dataclass

Represents a conditional statement in the PFDL.

A Condition consists of a boolean expression which has to be satisfied in order to execute the statements in the Passed block. Otherwise the statements in the Failed block will be executed.

Attributes:

Name Type Description
expression Dict

Boolean expression in form of a dict (see Visitor for the dict structure).

passed_stmts List[Union[Service, TaskCall, Loop, Condition]]

List of statements which are executed when the expression is satisfied.

failed_stmts List[Union[Service, TaskCall, Loop, Condition]]

List of statements which are executed when the expression is not satisfied.

context ParserRuleContext

ANTLR context object of this class.

context_dict Dict

Maps other attributes with ANTLR context objects.

Initialize the object.

Parameters:

Name Type Description Default
expression Dict

Boolean expression in form of a dict (see Visitor for the dict structure).

None
passed_stmts List[Union[Service, TaskCall, Loop, Condition]]

List of statements which are executed when the expression is satisfied.

None
failed_stmts List[Union[Service, TaskCall, Loop, Condition]]

List of statements which are executed when the expression is not satisfied.

None
context ParserRuleContext

ANTLR context object of this class.

None
Source code in pfdl_scheduler/model/condition.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(
    self,
    expression: Dict = None,
    passed_stmts: List[Union[Service, TaskCall, "Loop", "Condition"]] = None,
    failed_stmts: List[Union[Service, TaskCall, "Loop", "Condition"]] = None,
    context: ParserRuleContext = None,
) -> None:
    """Initialize the object.

    Args:
        expression: Boolean expression in form of a dict (see Visitor for the dict structure).
        passed_stmts: List of statements which are executed when the expression is satisfied.
        failed_stmts: List of statements which are executed when the expression is not satisfied.
        context: ANTLR context object of this class.
    """
    self.expression: Dict = expression

    if passed_stmts:
        self.passed_stmts: List[Union[Service, TaskCall, "Loop", "Condition"]] = passed_stmts
    else:
        self.passed_stmts: List[Union[Service, TaskCall, "Loop", "Condition"]] = []

    if failed_stmts:
        self.failed_stmts: List[Union[Service, TaskCall, "Loop", "Condition"]] = failed_stmts
    else:
        self.failed_stmts: List[Union[Service, TaskCall, "Loop", "Condition"]] = []

    self.context: ParserRuleContext = context
    self.context_dict: Dict = {}

counting_loop

Contains the CountingLoop class.

CountingLoop(statements=None, counting_variable='', limit='', parallel=False, context=None) dataclass

Bases: Loop

Represents a Counting Loop in the PFDL.

Counting loops count a variable from an initial value to a given upper limit. If the parallel keyword was used, this loop executes the statements in the loop body in parallel as many times as the loop would iterate.

Attributes:

Name Type Description
statements

List of statements inside the loop body.

context

ANTLR context object of this class.

context_dict

Maps other attributes with ANTLR context objects.

counting_variable str

Name of the variable which is counted in the loop.

limit str

Integer for the upper limit.

parallel bool

A boolean indicating if the loop is a parallel loop or not.

Initialize the object.

Parameters:

Name Type Description Default
statements List[Union[Service, TaskCall, Loop, Condition]]

List of statements inside the loop body.

None
counting_variable str

Name of the variable which is counted in the loop.

''
limit str

Integer for the upper limit.

''
parallel bool

A boolean indicating if the loop is a parallel loop or not.

False
context ParserRuleContext

ANTLR context object of this class.

None
Source code in pfdl_scheduler/model/counting_loop.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(
    self,
    statements: List[Union[Service, TaskCall, Loop, Condition]] = None,
    counting_variable: str = "",
    limit: str = "",
    parallel: bool = False,
    context: ParserRuleContext = None,
) -> None:
    """Initialize the object.

    Args:
        statements: List of statements inside the loop body.
        counting_variable: Name of the variable which is counted in the loop.
        limit: Integer for the upper limit.
        parallel: A boolean indicating if the loop is a parallel loop or not.
        context: ANTLR context object of this class.
    """
    Loop.__init__(self, statements, context)
    self.counting_variable: str = counting_variable
    self.limit: str = limit
    self.parallel: bool = parallel
    self.uuid = str(uuid.uuid4())

loop

Contains the Loop class.

Loop(statements=None, context=None) dataclass

The base class for the PFDL loops.

Attributes:

Name Type Description
statements List[Union[Service, TaskCall, Loop, Condition]]

List of statements inside the loop body.

context ParserRuleContext

ANTLR context object of this class.

context_dict Dict

Maps other attributes with ANTLR context objects.

Initialize the object.

Parameters:

Name Type Description Default
statements List[Union[Service, TaskCall, Loop, Condition]]

List of statements inside the loop body.

None
context ParserRuleContext

ANTLR context object of this class.

None
Source code in pfdl_scheduler/model/loop.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(
    self,
    statements: List[Union[Service, TaskCall, "Loop", Condition]] = None,
    context: ParserRuleContext = None,
) -> None:
    """Initialize the object.

    Args:
        statements: List of statements inside the loop body.
        context: ANTLR context object of this class.
    """
    if statements:
        self.statements: List[Union[Service, TaskCall, "Loop", Condition]] = statements
    else:
        self.statements: List[Union[Service, TaskCall, "Loop", Condition]] = []
    self.context: ParserRuleContext = context
    self.context_dict: Dict = {}

parallel

Contains the Parallel class.

Parallel(task_calls=None, context=None) dataclass

Represents a Parallel statement in the PFDL.

Each task within this instruction is executed in parallel with the calling task. When all parallel tasks are finished, the calling task continues its execution.

Attributes:

Name Type Description
task_calls List[TaskCall]

List of Task Calls in the Parallel statement.

context ParserRuleContext

ANTLR context object of this class.

context_dict Dict

Maps other attributes with ANTLR context objects.

Initialize the object.

Parameters:

Name Type Description Default
task_calls List[TaskCall]

List of Task Calls in the Parallel statement.

None
context ParserRuleContext

ANTLR context object of this class.

None
Source code in pfdl_scheduler/model/parallel.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self, task_calls: List[TaskCall] = None, context: ParserRuleContext = None
) -> None:
    """Initialize the object.

    Args:
        task_calls: List of Task Calls in the Parallel statement.
        context: ANTLR context object of this class.
    """
    if task_calls:
        self.task_calls: List[TaskCall] = task_calls
    else:
        self.task_calls: List[TaskCall] = []
    self.context: ParserRuleContext = context
    self.context_dict: Dict = {}

process

Contains the Process class.

Process(structs=None, tasks=None) dataclass

Represents a production process described in a PFDL file.

A Process consists of multiple Structs and Tasks. A Process object gets created after the visitor traverses the syntax tree.

Attributes:

Name Type Description
structs Dict[str, Struct]

A dict for mapping the Struct names to the Struct objects.

task Dict[str, Struct]

A dict for mapping the Task names to the Task objects.

Initialize the object.

Parameters:

Name Type Description Default
structs Dict[str, Struct]

A dict for mapping the Struct names to the Struct objects.

None
tasks Dict[str, Task]

A dict for mapping the Task names to the Task objects.

None
Source code in pfdl_scheduler/model/process.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(self, structs: Dict[str, Struct] = None, tasks: Dict[str, Task] = None) -> None:
    """Initialize the object.

    Args:
        structs: A dict for mapping the Struct names to the Struct objects.
        tasks: A dict for mapping the Task names to the Task objects.
    """
    if structs:
        self.structs: Dict[str, Struct] = structs
    else:
        self.structs: Dict[str, Struct] = {}
    if tasks:
        self.tasks: Dict[str, Task] = tasks
    else:
        self.tasks: Dict[str, Task] = {}

service

Contains the Service class.

Service(name='', input_parameters=None, output_parameters=None, context=None) dataclass

Represents a Service or Service Call in the PFDL.

Represents a Service or Service Call in the langauge which can be mapped to a real service that can be executed.

Attributes:

Name Type Description
name str

A string representing the name of the Service.

input_parameters List[Union[str, List[str], Struct]]

List of input parameters of the Service.

output_parameters OrderedDict[str, Union[str, Array]]

List of output parameters of the Service.

context ParserRuleContext

ANTLR context object of this class.

context_dict Dict

Maps other attributes with ANTLR context objects.

Initialize the object.

Parameters:

Name Type Description Default
name str

A string representing the name of the Service.

''
input_parameters List[Union[str, List[str], Struct]]

List of input parameters of the Service.

None
output_parameters Dict[str, Union[str, Array]]

List of output parameters of the Service.

None
context ParserRuleContext

ANTLR context object of this class.

None
Source code in pfdl_scheduler/model/service.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    name: str = "",
    input_parameters: List[Union[str, List[str], Struct]] = None,
    output_parameters: Dict[str, Union[str, Array]] = None,
    context: ParserRuleContext = None,
) -> None:
    """Initialize the object.

    Args:
        name: A string representing the name of the Service.
        input_parameters: List of input parameters of the Service.
        output_parameters: List of output parameters of the Service.
        context: ANTLR context object of this class.
    """
    self.name: str = name

    if input_parameters:
        self.input_parameters: List[Union[str, List[str], Struct]] = input_parameters
    else:
        self.input_parameters: List[Union[str, List[str], Struct]] = []

    if output_parameters:
        self.output_parameters: OrderedDict[str, Union[str, Array]] = output_parameters
    else:
        self.output_parameters: OrderedDict[str, Union[str, Array]] = OrderedDict()

    self.context: ParserRuleContext = context
    self.context_dict: Dict = {}

struct

Contains the Struct class.

Struct(name='', attributes=None, context=None) dataclass

Represents a Struct in the PFDL.

Data container for Services and Taskcalls. Used both for Struct definitons and instantiated Structs.

Attributes:

Name Type Description
name str

A string representing the name of the Struct.

attributes Dict[str, Union[str, Array, Struct]]

A dict which maps the attribute names to the defined type or a value (if its a instantiated struct).

context ParserRuleContext

ANTLR context object of this class.

context_dict Dict

Maps other attributes with ANTLR context objects.

Initialize the object.

Parameters:

Name Type Description Default
name str

A string representing the name of the Struct.

''
attributes Dict[str, Union[str, Array, Struct]]

A dict which maps the attribute names to the defined type or a value (if its a instantiated struct).

None
context ParserRuleContext

ANTLR context object of this class.

None
Source code in pfdl_scheduler/model/struct.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(
    self,
    name: str = "",
    attributes: Dict[str, Union[str, Array, "Struct"]] = None,
    context: ParserRuleContext = None,
) -> None:
    """Initialize the object.

    Args:
        name: A string representing the name of the Struct.
        attributes: A dict which maps the attribute names to the defined type
                    or a value (if its a instantiated struct).
        context: ANTLR context object of this class.
    """
    self.name: str = name
    if attributes:
        self.attributes: Dict[str, Union[str, Array, "Struct"]] = attributes
    else:
        self.attributes: Dict[str, Union[str, Array, "Struct"]] = {}
    self.context: ParserRuleContext = context
    self.context_dict: Dict = {}
from_json(json_string, error_handler, struct_context) classmethod

Creates a Struct instance out of the given JSON string.

Parameters:

Name Type Description Default
json_string str

A JSON string desribing the Struct.

required
error_handler ErrorHandler

An ErrorHandler instance used for printing errors.

required

Returns:

Type Description
Struct

The Struct which was created from the JSON string.

Source code in pfdl_scheduler/model/struct.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@classmethod
def from_json(
    cls, json_string: str, error_handler: ErrorHandler, struct_context: ParserRuleContext
) -> "Struct":
    """Creates a Struct instance out of the given JSON string.

    Args:
        json_string: A JSON string desribing the Struct.
        error_handler: An ErrorHandler instance used for printing errors.

    Returns:
        The Struct which was created from the JSON string.
    """
    json_object = json.loads(json_string)
    struct = parse_json(json_object, error_handler, struct_context)
    return struct
parse_json(json_object, error_handler, struct_context)

Parses the JSON Struct initialization.

Parameters:

Name Type Description Default
json_object Dict

A JSON object describing the Struct.

required
error_handler ErrorHandler

An ErrorHandler instance used for printing errors.

required
struct_context ParserRuleContext

The ANTLR struct context the struct corresponds to.

required

Returns:

Type Description
Struct

A Struct object representing the initialized Struct.

Source code in pfdl_scheduler/model/struct.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def parse_json(
    json_object: Dict, error_handler: ErrorHandler, struct_context: ParserRuleContext
) -> Struct:
    """Parses the JSON Struct initialization.

    Args:
        json_object: A JSON object describing the Struct.
        error_handler: An ErrorHandler instance used for printing errors.
        struct_context: The ANTLR struct context the struct corresponds to.

    Returns:
        A Struct object representing the initialized Struct.
    """
    struct = Struct()
    struct.context = struct_context

    for identifier, value in json_object.items():
        if isinstance(value, (int, float, str, bool)):
            struct.attributes[identifier] = value
        elif isinstance(value, list):
            array = Array()
            array.context = struct_context
            struct.attributes[identifier] = array
            for element in value:
                if isinstance(element, (int, float, str, bool)):
                    if isinstance(element, bool):
                        array.type_of_elements = "boolean"
                    elif isinstance(element, (int, float)):
                        array.type_of_elements = "number"
                    else:
                        array.type_of_elements = "string"
                    array.append_value(element)
                elif isinstance(element, dict):
                    inner_struct = parse_json(element, error_handler, struct_context)
                    array.append_value(inner_struct)
        elif isinstance(value, dict):
            inner_struct = parse_json(value, error_handler, struct_context)
            struct.attributes[identifier] = inner_struct
    return struct

task

Contains the Task class.

Task(name='', statements=None, variables=None, input_parameters=None, output_parameters=None, context=None) dataclass

Represents a Task in the PFDL.

A Task contains statements which are executed sequentially. It is possible to define input and output parameters of a Task.

Attributes:

Name Type Description
name str

A string representing the name of the Task.

statements List[Union[Service, TaskCall, Loop, Condition]]

List of statements which are executed sequentially.

variables Dict[str, Union[str, Array]]

Dict for mapping variable names with their values.

input_parameters OrderedDict[str, Union[str, Array]]

OrderedDict for mapping input parameter names with their values.

output_parameters List[str]

List of variable names as output parameters.

context ParserRuleContext

ANTLR context object of this class.

context_dict Dict

Maps other attributes with ANTLR context objects.

Initialize the object.

Parameters:

Name Type Description Default
name str

A string representing the name of the Task.

''
statements List[Union[Service, TaskCall, Loop, Condition]]

List of statements which are executed sequentially.

None
variables Dict[str, Union[str, Array]]

Dict for mapping variable names with their values.

None
input_parameters OrderedDict[str, Union[str, Array]]

OrderedDict for mapping input parameter names with their values.

None
output_parameters List[str]

List of variable names as output parameters.

None
context ParserRuleContext

ANTLR context object of this class.

None
Source code in pfdl_scheduler/model/task.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(
    self,
    name: str = "",
    statements: List[Union[Service, TaskCall, Loop, Condition]] = None,
    variables: Dict[str, Union[str, Array]] = None,
    input_parameters: OrderedDict[str, Union[str, Array]] = None,
    output_parameters: List[str] = None,
    context: ParserRuleContext = None,
) -> None:
    """Initialize the object.

    Args:
        name: A string representing the name of the Task.
        statements: List of statements which are executed sequentially.
        variables: Dict for mapping variable names with their values.
        input_parameters: OrderedDict for mapping input parameter names with their values.
        output_parameters: List of variable names as output parameters.
        context: ANTLR context object of this class.
    """
    self.name: str = name

    if statements:
        self.statements: List[Union[Service, TaskCall, Loop, Condition]] = statements
    else:
        self.statements: List[Union[Service, TaskCall, Loop, Condition]] = []

    if variables:
        self.variables: Dict[str, Union[str, Array]] = variables
    else:
        self.variables: Dict[str, Union[str, Array]] = {}

    if input_parameters:
        self.input_parameters: OrderedDict[str, Union[str, Array]] = input_parameters
    else:
        self.input_parameters: OrderedDict[str, Union[str, Array]] = {}

    if output_parameters:
        self.output_parameters: List[str] = output_parameters
    else:
        self.output_parameters: List[str] = []

    self.context: ParserRuleContext = context
    self.context_dict: Dict = {}

task_call

Contains the TaskCall class.

TaskCall(name='', input_parameters=None, output_parameters=None, context=None) dataclass

Represents a TaskCall in the PFDL.

Provides information about the name and call parameters of a Task which is called within another Task.

Attributes:

Name Type Description
name str

A string representing the name of the TaskCall.

input_parameters List[Union[str, List[str], Struct]]

List of input parameters of the TaskCall.

output_parameters Dict[str, Union[str, Array]]

List of output parameters of the TaskCall.

context ParserRuleContext

ANTLR context object of this class.

context_dict Dict

Maps other attributes with ANTLR context objects.

Initialize the object.

Parameters:

Name Type Description Default
name str

A string representing the name of the TaskCall.

''
input_parameters List[Union[str, List[str], Struct]]

List of input parameters of the TaskCall.

None
output_parameters Dict[str, Union[str, Array]]

List of output parameters of the TaskCall.

None
context ParserRuleContext

ANTLR context object of this class.

None
Source code in pfdl_scheduler/model/task_call.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    name: str = "",
    input_parameters: List[Union[str, List[str], Struct]] = None,
    output_parameters: Dict[str, Union[str, Array]] = None,
    context: ParserRuleContext = None,
) -> None:
    """Initialize the object.

    Args:
        name: A string representing the name of the TaskCall.
        input_parameters: List of input parameters of the TaskCall.
        output_parameters: List of output parameters of the TaskCall.
        context: ANTLR context object of this class.
    """
    self.name: str = name

    if input_parameters:
        self.input_parameters: List[Union[str, List[str], Struct]] = input_parameters
    else:
        self.input_parameters: List[Union[str, List[str], Struct]] = []

    if output_parameters:
        self.output_parameters: Dict[str, Union[str, Array]] = output_parameters
    else:
        self.output_parameters: Dict[str, Union[str, Array]] = {}

    self.context: ParserRuleContext = context
    self.context_dict: Dict = {}

while_loop

Contains the WhileLoop class.

WhileLoop(statements=None, expression=None, context=None) dataclass

Bases: Loop

Represents a While Loop in the PFDL.

Loops until conditional statement (expression) is satisfied.

Attributes:

Name Type Description
statements

List of statements inside the loop body.

expression Dict

Boolean expression in form of a dict.

context Dict

ANTLR context object of this class.

context_dict Dict

Maps other attributes with ANTLR context objects.

Initialize the object.

Parameters:

Name Type Description Default
statements List[Union[Service, TaskCall, Loop, Condition]]

List of statements inside the loop body.

None
expression Dict

Boolean expression in form of a dict.

None
context ParserRuleContext

ANTLR context object of this class.

None
Source code in pfdl_scheduler/model/while_loop.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    statements: List[Union[Service, TaskCall, Loop, Condition]] = None,
    expression: Dict = None,
    context: ParserRuleContext = None,
) -> None:
    """Initialize the object.

    Args:
        statements: List of statements inside the loop body.
        expression: Boolean expression in form of a dict.
        context: ANTLR context object of this class.
    """
    Loop.__init__(self, statements, context)

    if expression:
        self.expression: Dict = expression
    else:
        self.expression: Dict = {}

parser

pfdl_tree_visitor

Contains PFDLTreeVisitor class.

PFDLTreeVisitor(error_handler)

Bases: PFDLParserVisitor

Traverses the given parse tree and store program information in a Process object.

This class overrides the generated visitor methods from the ANTLR generated PFDLParserVisitor. A Process object is created and gets filled while traversing the syntax tree.

Attributes:

Name Type Description
error_handler ErrorHandler

ErrorHandler instance for printing errors while visiting.

current_task Task

Reference to the currently visited Task. Every visitor method can access it.

Initialize the object.

Parameters:

Name Type Description Default
error_handler ErrorHandler

ErrorHandler instance for printing errors while visiting.

required
Source code in pfdl_scheduler/parser/pfdl_tree_visitor.py
49
50
51
52
53
54
55
56
def __init__(self, error_handler: ErrorHandler) -> None:
    """Initialize the object.

    Args:
        error_handler: ErrorHandler instance for printing errors while visiting.
    """
    self.error_handler: ErrorHandler = error_handler
    self.current_task: Task = None
visitProgram(ctx)

Starts the visiting of the syntax tree of the given PFDL program.

Source code in pfdl_scheduler/parser/pfdl_tree_visitor.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def visitProgram(self, ctx) -> Process:
    """Starts the visiting of the syntax tree of the given PFDL program."""
    process = Process()

    if ctx.children:
        for child in ctx.children:
            process_component = self.visit(child)

            if isinstance(process_component, Struct):
                if process_component.name not in process.structs:
                    process.structs[process_component.name] = process_component
                else:
                    error_msg = (
                        f"A Struct with the name '{process_component.name}' "
                        "is already defined"
                    )
                    self.error_handler.print_error(error_msg, context=child)
            elif isinstance(process_component, Task):
                if process_component.name not in process.tasks:
                    process.tasks[process_component.name] = process_component
                else:
                    error_msg = (
                        f"A Task with the name '{process_component.name}' " "is already defined"
                    )
                    self.error_handler.print_error(error_msg, context=child)
    return process

petri_net

callbacks

Contains the PetriNetCallbacks class.

PetriNetCallbacks(task_started=None, service_started=None, service_finished=None, condition_started=None, while_loop_started=None, counting_loop_started=None, parallel_loop_started=None, task_finished=None) dataclass

Internal callback functions that can be registered in the petri net.

Attributes:

Name Type Description
task_started Callable[[TaskAPI], Any]

Callback function which gets called when a task is started.

service_started Callable[[ServiceAPI], Any]

Callback function which gets called when a service is started.

service_finished Callable[[ServiceAPI], Any]

Callback function which gets called when a task is started.

condition_started Callable[[Condition, str, str, TaskAPI], Any]

Callback function which gets called when a task is started.

while_loop_started Callable[[WhileLoop, str, str, TaskAPI], Any]

Callback function which gets called when a while loop is started.

counting_loop_started Callable[[CountingLoop, str, str, TaskAPI], Any]

Callback function which gets called when a counting loop is started.

parallel_loop_started Callable[[CountingLoop, TaskAPI, List, str, str], Any]

Callback function which gets called when a parallel loop is started.

task_finished Callable[[TaskAPI], Any]

Callback function which gets called when a task is finished.

drawer

Functions defined here set attributes for drawing the petri net.

draw_arcs(arc, attr)

Set attributes for drawing arcs.

Source code in pfdl_scheduler/petri_net/drawer.py
81
82
83
84
85
def draw_arcs(arc, attr):
    """Set attributes for drawing arcs."""
    if isinstance(arc, snakes.nets.Inhibitor):
        attr["arrowhead"] = INHIBITOR_ARC_ARROW_HEAD
    attr["label"] = DEFAULT_ARC_LABEL
draw_graph(graph, attr)

Set attributes for drawing the net.

Source code in pfdl_scheduler/petri_net/drawer.py
48
49
50
51
def draw_graph(graph, attr):
    """Set attributes for drawing the net."""
    attr["margin"] = GRAPH_MARGIN
    attr["newrank"] = NEW_RANK_VALUE
draw_petri_net(net, file_path, file_ending='.png')

Calls the draw method form the Snakes module on the given PetriNet.

Source code in pfdl_scheduler/petri_net/drawer.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def draw_petri_net(net, file_path, file_ending=".png"):
    """Calls the draw method form the Snakes module on the given PetriNet."""
    with draw_lock:
        net.draw(
            file_path + file_ending,
            LAYOUT_METHOD,
            graph_attr=draw_graph,
            arc_attr=draw_arcs,
            place_attr=draw_place,
            trans_attr=draw_transition,
            cluster_attr=draw_clusters,
        )
draw_place(place, attr)

Set attributes for drawing places.

Source code in pfdl_scheduler/petri_net/drawer.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def draw_place(place, attr):
    """Set attributes for drawing places."""
    if place.label("name") != "":
        attr["xlabel"] = place.label("name")
    else:
        attr["xlabel"] = place.name

    attr["group"] = place.label("group_id")

    if 1 in place:
        attr["label"] = "•"
    else:
        attr["label"] = PLACE_LABEL
    attr["shape"] = PLACE_SHAPE
draw_transition(trans, attr)

Set attributes for drawing transitions.

Source code in pfdl_scheduler/petri_net/drawer.py
70
71
72
73
74
75
76
77
78
def draw_transition(trans, attr):
    """Set attributes for drawing transitions."""

    attr["label"] = ""  # TRANSITION_LABEL
    attr["shape"] = TRANSITION_SHAPE
    attr["height"] = TRANSITION_HEIGHT
    attr["width"] = TRANSITION_WIDTH
    attr["fillcolor"] = TRANSITION_FILL_COLOR
    attr["group"] = trans.label("group_id")

generator

Contains the PetriNetGenerator class.

PetriNetGenerator(path_for_image='', used_in_extension=False, generate_test_ids=False, draw_net=True, file_name='petri_net')

Generates a Petri Net from a given Process object which corresponds to a PFDL file.

Attributes:

Name Type Description
path_for_image str

The path where the image of the generated Petri Net is saved.

net PetriNet

The snakes Petri Net instance.

tasks Dict[str, Task]

A dict representing the Tasks of the given Process object.

transition_dict OrderedDict

A dict for mapping the UUIDs of the Transitions to their behavior.

place_dict Dict

A dict for mapping the service id to the place name.

task_started_id str

The id of the 'Task started' place.

callbacks PetriNetCallbacks

A PetriNetCallbacks instance representing functions called while execution.

generate_test_ids bool

A boolean indicating if test ids (counting from 0) should be generated.

used_in_extension bool

A boolean indicating if the Generator is used within the extension.

Initialize the object.

Parameters:

Name Type Description Default
path_for_image str

The path where the image of the generated Petri Net is saved.

''
used_in_extension bool

A boolean indicating if the Generator is used within the extension.

False
generate_test_ids bool

A boolean indicating if test ids (counting from 0) should be generated.

False
draw_net bool

A boolean indicating if the petri net should be drawn.

True
Source code in pfdl_scheduler/petri_net/generator.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def __init__(
    self,
    path_for_image: str = "",
    used_in_extension: bool = False,
    generate_test_ids: bool = False,
    draw_net: bool = True,
    file_name: str = "petri_net",
) -> None:
    """Initialize the object.

    Args:
        path_for_image: The path where the image of the generated Petri Net is saved.
        used_in_extension: A boolean indicating if the Generator is used within the extension.
        generate_test_ids: A boolean indicating if test ids (counting from 0) should be generated.
        draw_net: A boolean indicating if the petri net should be drawn.
    """

    if used_in_extension:
        self.path_for_image: str = "../media/" + file_name
    elif path_for_image == "":
        Path("./temp").mkdir(parents=True, exist_ok=True)
        self.path_for_image: str = "temp/" + file_name
    else:
        Path("./temp").mkdir(parents=True, exist_ok=True)
        self.path_for_image: str = path_for_image + "/temp/" + file_name

    self.net: PetriNet = PetriNet("petri_net")
    self.draw_net: bool = draw_net
    self.tasks: Dict[str, Task] = None
    self.transition_dict: OrderedDict = OrderedDict()
    self.place_dict: Dict = {}
    self.task_started_id: str = ""
    self.callbacks: PetriNetCallbacks = PetriNetCallbacks()
    self.generate_test_ids: bool = generate_test_ids
    self.used_in_extension: bool = used_in_extension
    self.tree = None
    self.file_name = file_name
    self.cluster = None
add_callback(transition_id, callback_function, *args)

Registers the given callback function in the transition_dict.

If the transition the transition_id refers to is fired, the callback function will be called with the given arguments inside the PetriNetLogic class.

Parameters:

Name Type Description Default
transition_id str

The UUID of the transition where the callback is called if fired.

required
callback_function Callable

The callback function which should be called.

required
*args Any

Arguments with which the callback function is called.

()
Source code in pfdl_scheduler/petri_net/generator.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def add_callback(self, transition_id: str, callback_function: Callable, *args: Any) -> None:
    """Registers the given callback function in the transition_dict.

    If the transition the transition_id refers to is fired, the callback function
    will be called with the given arguments inside the PetriNetLogic class.

    Args:
        transition_id: The UUID of the transition where the callback is called if fired.
        callback_function: The callback function which should be called.
        *args: Arguments with which the callback function is called.
    """
    if not self.used_in_extension:
        callback = functools.partial(callback_function, *args)
        if transition_id not in self.transition_dict:
            self.transition_dict[transition_id] = []

        self.transition_dict[transition_id].append(callback)
generate_condition(condition, task_context, first_transition_id, second_transition_id, node, in_loop=False)

Generate Petri Net components for the Condition statement.

Returns:

Type Description
List[str]

The ids of the last transitions of the Condition petri net component.

Source code in pfdl_scheduler/petri_net/generator.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def generate_condition(
    self,
    condition: Condition,
    task_context: TaskAPI,
    first_transition_id: str,
    second_transition_id: str,
    node: Node,
    in_loop: bool = False,
) -> List[str]:
    """Generate Petri Net components for the Condition statement.

    Returns:
        The ids of the last transitions of the Condition petri net component.
    """
    group_id = str(uuid.uuid4())
    condition_node = Node(group_id, "Condition", node)

    passed_id = create_place("Passed", self.net, group_id)
    failed_id = create_place("Failed", self.net, group_id)

    expression_id = create_place(
        "If " + self.parse_expression(condition.expression),
        self.net,
        group_id,
    )

    first_passed_transition_id = create_transition("", "", self.net, group_id)
    first_failed_transition_id = create_transition("", "", self.net, group_id)

    self.net.add_input(expression_id, first_passed_transition_id, Value(1))
    self.net.add_input(expression_id, first_failed_transition_id, Value(1))

    self.net.add_input(passed_id, first_passed_transition_id, Value(1))
    self.net.add_input(failed_id, first_failed_transition_id, Value(1))

    finished_id = create_place("Condition_Finished", self.net, group_id)

    second_passed_transition_id = create_transition("", "", self.net, group_id)
    self.net.add_output(finished_id, second_passed_transition_id, Value(1))

    cluster = Cluster([passed_id, failed_id, expression_id, finished_id])
    node.cluster.add_child(cluster)

    cluster_passed = Cluster([first_passed_transition_id, second_passed_transition_id])
    cluster_failed = Cluster([first_failed_transition_id])
    cluster.add_child(cluster_passed)
    cluster.add_child(cluster_failed)
    condition_node.cluster = cluster_passed

    self.generate_statements(
        task_context,
        condition.passed_stmts,
        first_passed_transition_id,
        second_passed_transition_id,
        condition_node,
        in_loop,
    )

    self.net.add_output(expression_id, first_transition_id, Value(1))
    self.net.add_input(finished_id, second_transition_id, Value(1))

    args = (condition, passed_id, failed_id, task_context)
    self.add_callback(first_transition_id, self.callbacks.condition_started, *args)

    if condition.failed_stmts:
        condition_node.cluster = cluster_failed
        second_failed_transition_id = create_transition("", "", self.net, group_id)
        cluster_failed.add_node(second_failed_transition_id)
        self.generate_statements(
            task_context,
            condition.failed_stmts,
            first_failed_transition_id,
            second_failed_transition_id,
            condition_node,
            in_loop,
        )

        self.net.add_output(finished_id, second_failed_transition_id, Value(1))
        return [second_passed_transition_id, second_failed_transition_id]
    else:
        self.net.add_output(finished_id, first_failed_transition_id, Value(1))
        return [second_passed_transition_id, first_failed_transition_id]
generate_counting_loop(loop, task_context, first_transition_id, second_transition_id, node, in_loop=False)

Generates the Petri Net components for a Couting Loop.

Returns:

Type Description
str

The id of the last transition of the CountingLoop petri net component.

Source code in pfdl_scheduler/petri_net/generator.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def generate_counting_loop(
    self,
    loop: CountingLoop,
    task_context: TaskAPI,
    first_transition_id: str,
    second_transition_id: str,
    node: Node,
    in_loop: bool = False,
) -> str:
    """Generates the Petri Net components for a Couting Loop.

    Returns:
        The id of the last transition of the CountingLoop petri net component.
    """
    if loop.parallel:
        return self.generate_parallel_loop(
            loop, task_context, first_transition_id, second_transition_id, node
        )

    group_id = str(uuid.uuid4())
    counting_loop_node = Node(group_id, "Counting Loop", node)
    loop_id = create_place("Loop", self.net, group_id)

    loop_text = "Loop"

    loop_statements_id = create_place(loop_text, self.net, group_id)
    loop_finished_id = create_place("Number of Steps Done", self.net, group_id)

    condition_passed_transition_id = create_transition("", "", self.net, group_id)
    condition_failed_transition_id = create_transition("", "", self.net, group_id)
    iteration_step_done_transition_id = create_transition("", "", self.net, group_id)

    self.net.add_input(loop_id, condition_passed_transition_id, Value(1))
    self.net.add_input(loop_statements_id, condition_passed_transition_id, Value(1))
    self.net.add_input(loop_id, condition_failed_transition_id, Value(1))
    self.net.add_input(loop_finished_id, condition_failed_transition_id, Value(1))
    self.net.add_output(loop_id, iteration_step_done_transition_id, Value(1))

    loop_done_id = create_place("Loop Done", self.net, group_id)

    cluster = Cluster(
        [
            loop_id,
            loop_statements_id,
            loop_finished_id,
            condition_passed_transition_id,
            condition_failed_transition_id,
            iteration_step_done_transition_id,
            loop_done_id,
        ]
    )

    node.cluster.add_child(cluster)
    counting_loop_node.cluster = cluster

    self.generate_statements(
        task_context,
        loop.statements,
        condition_passed_transition_id,
        iteration_step_done_transition_id,
        counting_loop_node,
        True,
    )

    self.net.add_output(loop_done_id, condition_failed_transition_id, Value(1))
    self.net.add_output(loop_id, first_transition_id, Value(1))
    self.net.add_input(loop_done_id, second_transition_id, Value(1))

    args = (loop, loop_statements_id, loop_finished_id, task_context)
    self.add_callback(first_transition_id, self.callbacks.counting_loop_started, *args)
    self.add_callback(
        iteration_step_done_transition_id, self.callbacks.counting_loop_started, *args
    )

    return condition_failed_transition_id
generate_parallel(parallel, task_context, first_transition_id, second_transition_id, node, in_loop=False)

Generate the Petri Net components for a Parallel statement.

Returns:

Type Description
str

The id of the last transition of the Parallel petri net component.

Source code in pfdl_scheduler/petri_net/generator.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def generate_parallel(
    self,
    parallel: Parallel,
    task_context: TaskAPI,
    first_transition_id: str,
    second_transition_id: str,
    node: Node,
    in_loop: bool = False,
) -> str:
    """Generate the Petri Net components for a Parallel statement.

    Returns:
        The id of the last transition of the Parallel petri net component.
    """

    group_id = str(uuid.uuid4())
    parallel_node = Node(group_id, "Parallel", node)

    sync_id = create_transition("", "", self.net, group_id)
    parallel_finished_id = create_place("Parallel finished", self.net, group_id)

    cluster = Cluster([], Cluster([sync_id, parallel_finished_id]))
    node.cluster.add_child(cluster)
    parallel_node.cluster = cluster

    for task_call in parallel.task_calls:
        parallel_cluster = Cluster([])
        cluster.add_child(parallel_cluster)
        parallel_node.cluster = parallel_cluster
        self.generate_task_call(
            task_call, task_context, first_transition_id, sync_id, parallel_node, in_loop
        )

    self.net.add_output(parallel_finished_id, sync_id, Value(1))
    self.net.add_input(parallel_finished_id, second_transition_id, Value(1))
    return sync_id
generate_parallel_loop(loop, task_context, first_transition_id, second_transition_id, node)

Generates the static petri net components for a ParallelLoop.

This method will generate a placeholder for the ParallelLoop. The real amount of parallel startet Tasks is only known at runtime.

Returns:

Type Description
str

The id of the last transition of the ParallelLoop petri net component.

Source code in pfdl_scheduler/petri_net/generator.py
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
def generate_parallel_loop(
    self,
    loop: CountingLoop,
    task_context: TaskAPI,
    first_transition_id: str,
    second_transition_id: str,
    node: Node,
) -> str:
    """Generates the static petri net components for a ParallelLoop.

    This method will generate a placeholder for the ParallelLoop. The real amount
    of parallel startet Tasks is only known at runtime.

    Returns:
        The id of the last transition of the ParallelLoop petri net component.
    """

    group_id = str(uuid.uuid4())
    parallel_loop_node = Node(group_id, "Parallel Loop", node)

    parallel_loop_started = create_place(
        "Start " + loop.statements[0].name + " in parallel",
        self.net,
        group_id,
    )
    cluster = Cluster([parallel_loop_started])
    node.cluster.add_child(cluster)
    parallel_loop_node.cluster = cluster
    self.net.add_output(parallel_loop_started, first_transition_id, Value(1))
    self.net.add_input(parallel_loop_started, second_transition_id, Value(1))

    args = (
        loop,
        task_context,
        loop.statements[0],
        parallel_loop_started,
        first_transition_id,
        second_transition_id,
        parallel_loop_node,
    )
    self.add_callback(first_transition_id, self.callbacks.parallel_loop_started, *args)
    return second_transition_id
generate_petri_net(process)

Generates a Petri Net from the given Process object.

Starts from the 'productionTask' and iterates over his statements. For each statement type like Condition or TaskCall a Petri Net component is generated. All components get connected at the end.

Returns:

Type Description
PetriNet

A PetriNet instance representing the generated net.

Source code in pfdl_scheduler/petri_net/generator.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def generate_petri_net(self, process: Process) -> PetriNet:
    """Generates a Petri Net from the given Process object.

    Starts from the 'productionTask' and iterates over his statements.
    For each statement type like Condition or TaskCall a Petri Net component
    is generated. All components get connected at the end.

    Returns:
        A PetriNet instance representing the generated net.
    """
    self.tasks = process.tasks
    for task in process.tasks.values():
        if task.name == "productionTask":
            group_id = str(uuid.uuid4())
            self.tree = Node(group_id, task.name)

            task_context = TaskAPI(task, None)
            if self.generate_test_ids:
                task_context.uuid = "0"

            self.task_started_id = create_place(
                task.name + "_started", self.net, group_id, [0, 0]
            )
            connection_id = create_transition("", "", self.net, group_id)

            self.add_callback(connection_id, self.callbacks.task_started, task_context)

            self.net.add_input(self.task_started_id, connection_id, Value(1))
            self.task_finished_id = create_place(task.name + "_finished", self.net, group_id)

            second_connection_id = create_transition("", "", self.net, group_id)

            self.tree.cluster = Cluster(
                [
                    self.task_started_id,
                    connection_id,
                    second_connection_id,
                    self.task_finished_id,
                ]
            )
            self.generate_statements(
                task_context, task.statements, connection_id, second_connection_id, self.tree
            )
            self.net.add_output(self.task_finished_id, second_connection_id, Value(1))

            self.add_callback(second_connection_id, self.callbacks.task_finished, task_context)

    # assign new clusters before drawing
    self.net.clusters = self.tree.cluster

    if self.draw_net:
        json_string = json.dumps(self.tree.toJSON(), indent=4)
        draw_petri_net(self.net, self.path_for_image, ".dot")
        draw_petri_net(self.net, self.path_for_image, ".png")
        with open(self.path_for_image + ".dot", "a") as file:
            file.write("\ncall_tree:")
            file.write(json_string)

    return self.net
generate_service(service, task_context, first_transition_id, second_transition_id, node, in_loop=False)

Generate the Petri Net components for a Service Call.

Returns:

Type Description
str

The id of the last transition of the Service petri net component.

Source code in pfdl_scheduler/petri_net/generator.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def generate_service(
    self,
    service: Service,
    task_context: TaskAPI,
    first_transition_id: str,
    second_transition_id: str,
    node: Node,
    in_loop: bool = False,
) -> str:
    """Generate the Petri Net components for a Service Call.

    Returns:
        The id of the last transition of the Service petri net component.
    """
    group_id = str(uuid.uuid4())
    Node(group_id, service.name, node)

    service_api = ServiceAPI(service, task_context, in_loop=in_loop)

    service_started_id = create_place(service.name + " started", self.net, group_id)
    service_finished_id = create_place(service.name + " finished", self.net, group_id)

    self.place_dict[service_api.uuid] = service_finished_id

    service_done_id = create_place(service.name + " done", self.net, group_id)
    service_done_transition_id = create_transition("service_done", "", self.net, group_id)

    self.add_callback(first_transition_id, self.callbacks.service_started, service_api)
    self.add_callback(service_done_transition_id, self.callbacks.service_finished, service_api)

    self.net.add_input(service_started_id, service_done_transition_id, Value(1))
    self.net.add_input(service_finished_id, service_done_transition_id, Value(1))
    self.net.add_output(service_done_id, service_done_transition_id, Value(1))

    self.net.add_output(service_started_id, first_transition_id, Value(1))
    self.net.add_input(service_done_id, second_transition_id, Value(1))

    node.cluster.add_child(
        (
            Cluster(
                [
                    service_started_id,
                    service_finished_id,
                    service_done_transition_id,
                    service_done_id,
                ]
            )
        )
    )
    return service_done_transition_id
generate_statements(task_context, statements, first_connection_id, last_connection_id, node, in_loop=False)

Generate Petri Net components for each statement in the given Task

Iterate over the statements of the given Tasks and generate the corresponding Petri Net components. Connect the individual components with each other via a transition.

Returns:

Type Description
List[str]

The ids of the last connections (transitions).

Source code in pfdl_scheduler/petri_net/generator.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def generate_statements(
    self,
    task_context: TaskAPI,
    statements: List,
    first_connection_id: str,
    last_connection_id: str,
    node: Node,
    in_loop: bool = False,
) -> List[str]:
    """Generate Petri Net components for each statement in the given Task

    Iterate over the statements of the given Tasks and generate the corresponding
    Petri Net components. Connect the individual components with each other via a
    transition.

    Returns:
        The ids of the last connections (transitions).
    """
    current_connection_id = ""
    previous_connection_id = first_connection_id

    connection_ids = []
    for i, statement in enumerate(statements):
        multiple_statements = len(statements) > 1

        # only create a transition when there is more than 1 statement
        # and we are not in the last iteration
        if multiple_statements:
            if i < len(statements) - 1:
                current_connection_id = create_transition(
                    "connection", "", self.net, node.group_id
                )
                node.cluster.add_node(current_connection_id)
            else:
                current_connection_id = last_connection_id
        else:
            previous_connection_id = first_connection_id
            current_connection_id = last_connection_id

        args = (
            statement,
            task_context,
            previous_connection_id,
            current_connection_id,
            node,
            in_loop,
        )

        if isinstance(statement, Service):
            connection_ids = [self.generate_service(*args)]
        elif isinstance(statement, TaskCall):
            connection_ids = self.generate_task_call(*args)
        elif isinstance(statement, Parallel):
            connection_ids = [self.generate_parallel(*args)]
        elif isinstance(statement, CountingLoop):
            connection_ids = [self.generate_counting_loop(*args)]
        elif isinstance(statement, WhileLoop):
            connection_ids = [self.generate_while_loop(*args)]
        else:
            connection_ids = self.generate_condition(*args)

        previous_connection_id = current_connection_id

    return connection_ids
generate_task_call(task_call, task_context, first_transition_id, second_transition_id, node, in_loop=False)

Generate the Petri Net components for a Task Call.

Returns:

Type Description
List[str]

The ids of the last transitions of the TaskCall petri net component.

Source code in pfdl_scheduler/petri_net/generator.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def generate_task_call(
    self,
    task_call: TaskCall,
    task_context: TaskAPI,
    first_transition_id: str,
    second_transition_id: str,
    node: Node,
    in_loop: bool = False,
) -> List[str]:
    """Generate the Petri Net components for a Task Call.

    Returns:
        The ids of the last transitions of the TaskCall petri net component.
    """
    called_task = self.tasks[task_call.name]
    new_task_context = TaskAPI(called_task, task_context, task_call=task_call, in_loop=in_loop)

    group_id = str(uuid.uuid4())
    task_node = Node(group_id, task_call.name, node)

    task_cluster = Cluster([])
    node.cluster.add_child(task_cluster)
    task_node.cluster = task_cluster

    # Order for callbacks important: Task starts before statement and finishes after
    self.add_callback(first_transition_id, self.callbacks.task_started, new_task_context)
    last_connection_ids = self.generate_statements(
        new_task_context,
        called_task.statements,
        first_transition_id,
        second_transition_id,
        task_node,
        in_loop,
    )

    for last_connection_id in last_connection_ids:
        self.add_callback(last_connection_id, self.callbacks.task_finished, new_task_context)

    return last_connection_ids
generate_while_loop(loop, task_context, first_transition_id, second_transition_id, node, in_loop=False)

Generate the Petri Net components for a While Loop.

Returns:

Type Description
str

The id of the last transition of the WhileLoop petri net component.

Source code in pfdl_scheduler/petri_net/generator.py
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
def generate_while_loop(
    self,
    loop: CountingLoop,
    task_context: TaskAPI,
    first_transition_id: str,
    second_transition_id: str,
    node: Node,
    in_loop: bool = False,
) -> str:
    """Generate the Petri Net components for a While Loop.

    Returns:
        The id of the last transition of the WhileLoop petri net component.
    """
    group_id = str(uuid.uuid4())
    while_loop_node = Node(group_id, "While Loop", node)

    loop_id = create_place("Loop", self.net, group_id)

    loop_text = "Loop"

    loop_statements_id = create_place(loop_text, self.net, group_id)
    loop_finished_id = create_place("Number of Steps Done", self.net, group_id)

    condition_passed_transition_id = create_transition("", "", self.net, group_id)
    condition_failed_transition_id = create_transition("", "", self.net, group_id)
    iteration_step_done_transition_id = create_transition("", "", self.net, group_id)

    self.net.add_input(loop_id, condition_passed_transition_id, Value(1))
    self.net.add_input(loop_statements_id, condition_passed_transition_id, Value(1))
    self.net.add_input(loop_id, condition_failed_transition_id, Value(1))
    self.net.add_input(loop_finished_id, condition_failed_transition_id, Value(1))
    self.net.add_output(loop_id, iteration_step_done_transition_id, Value(1))

    loop_done_id = create_place("Loop Done", self.net, group_id)

    cluster = Cluster(
        [
            loop_id,
            loop_statements_id,
            loop_finished_id,
            condition_failed_transition_id,
            loop_done_id,
            condition_passed_transition_id,
            iteration_step_done_transition_id,
        ]
    )

    node.cluster.add_child(cluster)
    while_loop_node.cluster = cluster
    self.generate_statements(
        task_context,
        loop.statements,
        condition_passed_transition_id,
        iteration_step_done_transition_id,
        while_loop_node,
        True,
    )

    self.net.add_output(loop_id, first_transition_id, Value(1))
    self.net.add_input(loop_done_id, second_transition_id, Value(1))

    args = (loop, loop_statements_id, loop_finished_id, task_context)
    self.add_callback(first_transition_id, self.callbacks.while_loop_started, *args)
    self.add_callback(
        iteration_step_done_transition_id, self.callbacks.while_loop_started, *args
    )

    self.net.add_output(loop_done_id, condition_failed_transition_id, Value(1))

    return condition_failed_transition_id
parse_expression(expression)

Parses the given expression to a printable format.

Returns:

Type Description
str

The content of the expression as a formatted string.

Source code in pfdl_scheduler/petri_net/generator.py
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
def parse_expression(self, expression: Dict) -> str:
    """Parses the given expression to a printable format.

    Returns:
        The content of the expression as a formatted string.
    """
    if isinstance(expression, (str, int, float, bool)):
        return str(expression)
    if isinstance(expression, list):
        list_string = ""
        for element in expression:
            if list_string != "":
                list_string = list_string + "." + element
            else:
                list_string = element
        return list_string
    if len(expression) == 2:
        return "!" + self.parse_expression(expression["value"])
    if expression["left"] == "(" and expression["right"] == ")":
        return "(" + self.parse_expression(expression["binOp"]) + ")"

    return (
        self.parse_expression(expression["left"])
        + expression["binOp"]
        + self.parse_expression(expression["right"])
    )
remove_place_on_runtime(place_id)

Removes a place from the petri net at runtime.

Parameters:

Name Type Description Default
place_id str

The id as string of the task which should be removed from the net.

required
Source code in pfdl_scheduler/petri_net/generator.py
588
589
590
591
592
593
594
595
596
597
598
599
600
def remove_place_on_runtime(self, place_id: str) -> None:
    """Removes a place from the petri net at runtime.

    Args:
        place_id: The id as string of the task which should be removed from the net.
    """
    if self.net.has_place(place_id):
        # temporary fix
        # self.net.clusters.remove_node(self.task_started_id)
        # self.net.remove_place(self.task_started_id)

        if self.draw_net:
            draw_petri_net(self.net, self.path_for_image)
create_place(name, net, group_id, cluster=[])

Utility function for creating a place with the snakes module.

This function is used to add a place with the given name and to add labels for scheduling (for example if the place represents an event or if its initialized).

Parameters:

Name Type Description Default
name str

A string representing the displayed name of the place.

required
net PetriNet

The petri net instance this place should be added to.

required
group_id str
required

Returns:

Type Description
str

A UUID as string for the added place.

Source code in pfdl_scheduler/petri_net/generator.py
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
def create_place(name: str, net: PetriNet, group_id: str, cluster: List = []) -> str:
    """Utility function for creating a place with the snakes module.

    This function is used to add a place with the given name and to add labels for
    scheduling (for example if the place represents an event or if its initialized).

    Args:
        name: A string representing the displayed name of the place.
        net: The petri net instance this place should be added to.
        group_id:

    Returns:
        A UUID as string for the added place.
    """
    place_id = str(uuid.uuid4())
    net.add_place(Place(place_id, []), cluster=cluster)
    net.place(place_id).label(name=name, group_id=group_id)
    return place_id
create_transition(transition_name, transition_type, net, group_id)

Utility function for creating a transition with the snakes module.

This function is used to add a transition with the given name and to add labels for scheduling (currently only the type of the transition).

Parameters:

Name Type Description Default
transition_name str

A string representing the displayed name of the transition.

required
net PetriNet

The petri net instance this transition should be added to.

required
group_id str
required

Returns:

Type Description
str

A UUID as string for the added transition.

Source code in pfdl_scheduler/petri_net/generator.py
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
def create_transition(
    transition_name: str, transition_type: str, net: PetriNet, group_id: str
) -> str:
    """Utility function for creating a transition with the snakes module.

    This function is used to add a transition with the given name and to add labels for
    scheduling (currently only the type of the transition).

    Args:
        transition_name: A string representing the displayed name of the transition.
        net: The petri net instance this transition should be added to.
        group_id:

    Returns:
        A UUID as string for the added transition.
    """
    transition_id = str(uuid.uuid4())
    net.add_transition(Transition(transition_id))
    net.transition(transition_id).label(
        name=transition_name,
        transitionType=transition_type,
        group_id=group_id,
    )
    return transition_id

logic

Contains the PetriNetLogic class.

PetriNetLogic(petri_net_generator, draw_net=True, file_name='')

Provides methods for interacting with the generated petri nets for scheduling.

Scheduling of the production process with the help of the generated petri nets is done in this class.

Attributes:

Name Type Description
petri_net_generator PetriNetGenerator

A reference to the PetriNetGenerator.

petri_net PetriNet

A reference to the generated petri net.

draw_net bool

Indiciating whether the net should be drawn.

transition_dict Dict

A reference to the dict in the generator which maps the ids to callbacks.

Initialize the object.

Parameters:

Name Type Description Default
petri_net_generator PetriNetGenerator

A reference to the PetriNetGenerator.

required
draw_net bool

Indiciating whether the net should be drawn.

True
Source code in pfdl_scheduler/petri_net/logic.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self, petri_net_generator: PetriNetGenerator, draw_net: bool = True, file_name: str = ""
):
    """Initialize the object.

    Args:
        petri_net_generator: A reference to the PetriNetGenerator.
        draw_net: Indiciating whether the net should be drawn.
    """
    self.petri_net_generator: PetriNetGenerator = petri_net_generator
    self.petri_net: PetriNet = petri_net_generator.net
    self.draw_net: bool = draw_net
    self.transition_dict: Dict = self.petri_net_generator.transition_dict
    self.file_name = file_name
draw_petri_net()

Saves the given petri net as an image in the current working directory.

Parameters:

Name Type Description Default
name

The name of the image.

required
petri_net

The petri net instance that should be drawn.

required
Source code in pfdl_scheduler/petri_net/logic.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def draw_petri_net(self) -> None:
    """Saves the given petri net as an image in the current working directory.

    Args:
        name: The name of the image.
        petri_net: The petri net instance that should be drawn.
    """

    file_path = "./temp/" + self.file_name

    if self.draw_net:
        draw_petri_net(self.petri_net, file_path)
        draw_petri_net(self.petri_net, file_path, ".dot")
        with open(file_path + ".dot", "a") as file:
            file.write("\ncall_tree:")
            file.write(json.dumps(self.petri_net_generator.tree.toJSON(), indent=4))
evaluate_petri_net()

Tries to fire every transition as long as all transitions were tried and nothing can be done anymore.

Source code in pfdl_scheduler/petri_net/logic.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def evaluate_petri_net(self) -> None:
    """Tries to fire every transition as long as all transitions
    were tried and nothing can be done anymore.
    """
    index = 0

    transitions = list(self.petri_net._trans)
    while index < len(transitions):
        transition_id = transitions[index]

        if self.petri_net.transition(transition_id).enabled(Value(1)):
            if transition_id in self.transition_dict:
                callbacks = self.transition_dict[transition_id]
                temp = None

                for callback in callbacks:
                    # parallel loop functionality stop evaluation
                    if callback.func.__name__ == "on_parallel_loop_started":
                        temp = callback
                        callbacks.remove(temp)

                if temp:
                    for callback in list(callbacks):
                        callback()
                        callbacks.remove(callback)
                    temp()
                    return
                else:
                    self.petri_net.transition(transition_id).fire(Value(1))

                    for callback in callbacks:
                        callback()
            else:
                self.petri_net.transition(transition_id).fire(Value(1))

            index = 0
        else:
            index = index + 1

    self.draw_petri_net()
fire_event(event)

Adds a token to the corresponding place of the event in the petri net.

Parameters:

Name Type Description Default
event Event

The Event object that is fired.

required
Source code in pfdl_scheduler/petri_net/logic.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def fire_event(self, event: Event) -> bool:
    """Adds a token to the corresponding place of the event in the petri net.

    Args:
        event: The Event object that is fired.
    """

    name_in_petri_net = ""
    if event.event_type == START_PRODUCTION_TASK:
        name_in_petri_net = self.petri_net_generator.task_started_id
    elif event.event_type == SET_PLACE:
        name_in_petri_net = event.data["place_id"]
    elif event.event_type == SERVICE_FINISHED:
        name_in_petri_net = self.petri_net_generator.place_dict[event.data["service_id"]]

    if self.petri_net.has_place(name_in_petri_net):
        self.petri_net.place(name_in_petri_net).add(1)
        self.draw_petri_net()
        self.evaluate_petri_net()
        return True

    return False

scheduler

Contains the Scheduler class.

ParallelLoopCounter()

Represents an intermediate object which indicates that the counter is from a parallel loop.

Source code in pfdl_scheduler/scheduler.py
44
45
def __init__(self):
    self.value = -1

Scheduler(pfdl_file_path, generate_test_ids=False, draw_petri_net=True, scheduler_id='', dashboard_host_address='')

Bases: Subject

Schedules Tasks of a given PFDL file.

The scheduler comprises almost the complete execution of a production order including the parsing of the PFDL description, model creation and validation and execution of the petri net. It interacts with the execution engines and informs them about services or tasks which started or finished.

This class implements the Observer pattern and serves as subject. Observers can be registered in the scheduler and receive updates (e.g. log entries, info about a new petri net img,..)

Attributes:

Name Type Description
running bool

A boolean that indicates whether the scheduler is running.

pfdl_file_valid bool

A boolean indicating whether the given PFDL file was valid.

process Process

The corresponding Process instance from the PFDL file.

petri_net_generator PetriNetGenerator

A PetriNetGenerator instance for generating the petri net.

petri_net_logic PetriNetLogic

A PetriNetLogic instance for execution of the petri net.

task_callbacks TaskCallbacks

TaskCallbacks instance which holds the registered callbacks.

variable_access_function Callable[[str], str]

The function which will be called when the scheduler needs a variable.

loop_counters Dict[str, Dict[str, int]]

A dict for mapping task ids to the current loop counter (counting loops).

awaited_events List[Event]

A list of awaited Events. Only these events can be passed to the net.

generate_test_ids bool

Indicates whether test ids should be generated.

test_id_counters List[int]

A List consisting of counters for the test ids of tasks and services.

observers List[Observer]

List of Observers used to update them on a notify call.

Initialize the object.

If the given path leads to a valid PFDL file the parsing will be started. If no errors occur the model of the PFDL File will be transformed into a petri net and be drawn if the draw_petri_net flag is set. If generate_test_ids is set the ids of the called tasks and services will be an enumeration starting at 0.

Parameters:

Name Type Description Default
pfdl_file_path str

The path to the PFDL file.

required
generate_test_ids bool

A boolean indicating whether test ids should be generated.

False
draw_petri_net bool

A boolean indicating whether the petri net should be drawn.

True
scheduler_id str

A unique ID to identify the Scheduer / Production Order

''
dashboard_host_address str

The address of the Dashboard (if existing)

''
Source code in pfdl_scheduler/scheduler.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(
    self,
    pfdl_file_path: str,
    generate_test_ids: bool = False,
    draw_petri_net: bool = True,
    scheduler_id: str = "",
    dashboard_host_address: str = "",
) -> None:
    """Initialize the object.

    If the given path leads to a valid PFDL file the parsing will be started. If no errors
    occur the model of the PFDL File will be transformed into a petri net and be drawn if
    the `draw_petri_net` flag is set. If `generate_test_ids` is set the ids of the called
    tasks and services will be an enumeration starting at 0.

    Args:
        pfdl_file_path: The path to the PFDL file.
        generate_test_ids: A boolean indicating whether test ids should be generated.
        draw_petri_net: A boolean indicating whether the petri net should be drawn.
        scheduler_id: A unique ID to identify the Scheduer / Production Order
        dashboard_host_address: The address of the Dashboard (if existing)
    """
    if scheduler_id == "":
        self.scheduler_id: str = str(uuid.uuid4())
    else:
        self.scheduler_id: str = scheduler_id
    self.running: bool = False
    self.pfdl_file_valid: bool = False
    self.process: Process = None
    self.petri_net_generator: PetriNetGenerator = None
    self.petri_net_logic: PetriNetLogic = None
    self.task_callbacks: TaskCallbacks = TaskCallbacks()
    self.variable_access_function: Callable[[str], str] = None
    self.loop_counters: Dict[str, Dict[str, int]] = {}
    self.awaited_events: List[Event] = []
    self.generate_test_ids: bool = generate_test_ids
    self.test_id_counters: List[int] = [0, 0]
    self.pfdl_file_valid, self.process, pfdl_string = parse_file(pfdl_file_path)
    if self.pfdl_file_valid:
        self.petri_net_generator = PetriNetGenerator(
            "",
            generate_test_ids=generate_test_ids,
            draw_net=draw_petri_net,
            file_name=self.scheduler_id,
        )
        self.register_for_petrinet_callbacks()

        self.petri_net_generator.generate_petri_net(self.process)
        self.petri_net_logic = PetriNetLogic(
            self.petri_net_generator, draw_petri_net, file_name=self.scheduler_id
        )

    awaited_event = Event(event_type=START_PRODUCTION_TASK, data={})
    self.awaited_events.append(awaited_event)
    self.observers: List[Observer] = []

    # enable logging
    self.attach(LogEntryObserver(self.scheduler_id))

    if dashboard_host_address != "":
        self.attach(DashboardObserver(dashboard_host_address, self.scheduler_id, pfdl_string))
attach(observer)

Attach (add) an observer object to the observers list.

Source code in pfdl_scheduler/scheduler.py
136
137
138
def attach(self, observer: Observer) -> None:
    """Attach (add) an observer object to the observers list."""
    self.observers.append(observer)
check_expression(expression, task_context)

Check the boolean value of the given PFDL expression as a Python expression.

This method only gets executed if the semantic error check is passed. This means that no additional semantic checks has to be performed.

Parameters:

Name Type Description Default
expression Dict

A dict representing the expression.

required
task_context TaskAPI

The context in which the expression should be evaluated.

required

Returns:

Type Description
bool

The value of the successfully executed expression in Python as a bool.

Source code in pfdl_scheduler/scheduler.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
def check_expression(self, expression: Dict, task_context: TaskAPI) -> bool:
    """Check the boolean value of the given PFDL expression as a Python expression.

    This method only gets executed if the semantic error check is passed. This means that
    no additional semantic checks has to be performed.

    Arguments:
        expression: A dict representing the expression.
        task_context: The context in which the expression should be evaluated.

    Returns:
        The value of the successfully executed expression in Python as a bool.
    """
    return bool(self.execute_expression(expression, task_context))
detach(observer)

Detach (remove) an observer object from the observers list.

Source code in pfdl_scheduler/scheduler.py
140
141
142
def detach(self, observer: Observer) -> None:
    """Detach (remove) an observer object from the observers list."""
    self.observers.remove(observer)
execute_expression(expression, task_context)

Executes the given PFDL expression as a Python expression.

Parameters:

Name Type Description Default
expression Dict

A dict representing the expression.

required
task_context TaskAPI

The context in which the expression should be evaluated.

required

Returns:

Type Description
Any

The value of the expression executed in Python (type depends on specific expression).

Source code in pfdl_scheduler/scheduler.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
def execute_expression(self, expression: Dict, task_context: TaskAPI) -> Any:
    """Executes the given PFDL expression as a Python expression.

    Arguments:
        expression: A dict representing the expression.
        task_context: The context in which the expression should be evaluated.

    Returns:
        The value of the expression executed in Python (type depends on specific expression).
    """
    if isinstance(expression, (str, int, float, bool)):
        return expression
    if isinstance(expression, list):
        variable_name = expression[0]
        variable = self.variable_access_function(variable_name, task_context)

        for i in range(1, len(expression)):
            variable = variable.attributes[expression[i]]
        return variable
    if len(expression) == 2:
        return not self.execute_expression(expression["value"], task_context)

    if expression["left"] == "(" and expression["right"] == ")":
        return self.execute_expression(expression["binOp"], task_context)

    left_side = self.execute_expression(expression["left"], task_context)
    right_side = self.execute_expression(expression["right"], task_context)

    op_func = helpers.parse_operator(expression["binOp"])
    return op_func(left_side, right_side)
fire_event(event)

Forwards the given Event to the PetriNetLogic instance.

The given Event object will be passed to the petri net if it is an awaited event.

Parameters:

Name Type Description Default
event Event

An Event instance.

required

Returns:

Type Description
bool

True if the event could be fired to the petri net (is an awaited event).

Source code in pfdl_scheduler/scheduler.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def fire_event(self, event: Event) -> bool:
    """Forwards the given Event to the PetriNetLogic instance.

    The given `Event` object will be passed to the petri net if it is an awaited
    event.

    Args:
        event: An `Event` instance.

    Returns:
        True if the event could be fired to the petri net (is an awaited event).
    """

    if event in self.awaited_events:
        if self.petri_net_logic.fire_event(event):
            self.awaited_events.remove(event)
            self.notify(NotificationType.PETRI_NET, self.scheduler_id)
            return True
    return False
notify(notification_type, data)

Trigger an update in each subscriber.

Parameters:

Name Type Description Default
notification_type NotificationType

A NotificationType informs about the type of the data.

required
data Any

The data which the observers will receive.

required
Source code in pfdl_scheduler/scheduler.py
144
145
146
147
148
149
150
151
152
153
def notify(self, notification_type: NotificationType, data: Any) -> None:
    """Trigger an update in each subscriber.

    Args:
        notification_type: A `NotificationType` informs about the type of the data.
        data: The data which the observers will receive.
    """

    for observer in self.observers:
        observer.update(notification_type, data)
on_condition_started(condition, then_uuid, else_uuid, task_context)

Executes Scheduling logic when a Condition statement is started.

Source code in pfdl_scheduler/scheduler.py
340
341
342
343
344
345
346
347
348
349
350
351
def on_condition_started(
    self, condition: Condition, then_uuid: str, else_uuid: str, task_context: TaskAPI
) -> None:
    """Executes Scheduling logic when a Condition statement is started."""
    if self.check_expression(condition.expression, task_context):
        awaited_event = Event(event_type=SET_PLACE, data={"place_id": then_uuid})
        self.awaited_events.append(awaited_event)
        self.fire_event(awaited_event)
    else:
        awaited_event = Event(event_type=SET_PLACE, data={"place_id": else_uuid})
        self.awaited_events.append(awaited_event)
        self.fire_event(awaited_event)
on_counting_loop_started(loop, then_uuid, else_uuid, task_context)

Executes Scheduling logic when a Counting Loop is started.

Source code in pfdl_scheduler/scheduler.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def on_counting_loop_started(
    self, loop: CountingLoop, then_uuid: str, else_uuid: str, task_context: TaskAPI
) -> None:
    """Executes Scheduling logic when a Counting Loop is started."""

    if self.loop_counters.get(task_context.uuid) is None:
        self.loop_counters[task_context.uuid] = {}

    if self.loop_counters[task_context.uuid].get(loop) is None:
        self.loop_counters[task_context.uuid][loop] = 0
    else:
        self.loop_counters[task_context.uuid][loop] = (
            self.loop_counters[task_context.uuid][loop] + 1
        )

    loop_counter = self.loop_counters[task_context.uuid][loop]
    loop_limit = self.get_loop_limit(loop, task_context)

    if loop_counter < loop_limit:
        awaited_event = Event(event_type=SET_PLACE, data={"place_id": then_uuid})
        self.awaited_events.append(awaited_event)

        self.fire_event(awaited_event)
    else:
        awaited_event = Event(event_type=SET_PLACE, data={"place_id": else_uuid})
        self.awaited_events.append(awaited_event)

        # has to be executed at last
        self.fire_event(awaited_event)
on_parallel_loop_started(loop, task_context, parallelTask, parallel_loop_started, first_transition_id, second_transition_id, node)

Executes Scheduling logic when a Parallel Loop is started.

Source code in pfdl_scheduler/scheduler.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def on_parallel_loop_started(
    self,
    loop: CountingLoop,
    task_context: TaskAPI,
    parallelTask: Task,
    parallel_loop_started,
    first_transition_id: str,
    second_transition_id: str,
    node: Node,
) -> None:
    """Executes Scheduling logic when a Parallel Loop is started."""
    task_count = self.get_loop_limit(loop, task_context)

    # generate parallel tasks in petri net
    if task_count > 0:
        for i in range(int(task_count)):
            self.petri_net_generator.generate_task_call(
                parallelTask,
                task_context,
                first_transition_id,
                second_transition_id,
                node,
                False,
            )

            if self.loop_counters.get(task_context.uuid) is None:
                self.loop_counters[task_context.uuid] = {}
            self.loop_counters[task_context.uuid][
                loop.counting_variable
            ] = ParallelLoopCounter()
    else:
        self.petri_net_generator.generate_empty_parallel_loop(
            first_transition_id, second_transition_id
        )

    self.petri_net_generator.remove_place_on_runtime(parallel_loop_started)

    # start evaluation of net again
    self.petri_net_logic.evaluate_petri_net()
on_service_finished(service_api)

Executes Scheduling logic when a Service is finished.

Source code in pfdl_scheduler/scheduler.py
326
327
328
329
330
331
332
333
334
335
336
337
338
def on_service_finished(self, service_api: ServiceAPI) -> None:
    """Executes Scheduling logic when a Service is finished."""
    for callback in self.task_callbacks.service_finished:
        callback(service_api)

    log_entry = (
        "Service "
        + service_api.service.name
        + " with UUID '"
        + service_api.uuid
        + "' finished."
    )
    self.notify(NotificationType.LOG_EVENT, (log_entry, logging.INFO, False))
on_service_started(service_api)

Executes Scheduling logic when a Service is started.

Source code in pfdl_scheduler/scheduler.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def on_service_started(self, service_api: ServiceAPI) -> None:
    """Executes Scheduling logic when a Service is started."""

    new_uuid = str(uuid.uuid4())
    if self.generate_test_ids:
        new_uuid = str(self.test_id_counters[1])
        self.test_id_counters[1] = self.test_id_counters[1] + 1
    self.petri_net_generator.place_dict[new_uuid] = self.petri_net_generator.place_dict[
        service_api.uuid
    ]

    service_api.uuid = new_uuid
    if service_api.input_parameters:
        service_api.input_parameters = copy.deepcopy(service_api.service.input_parameters)

    awaited_event = Event(event_type=SERVICE_FINISHED, data={"service_id": service_api.uuid})
    self.awaited_events.append(awaited_event)

    self.substitute_loop_indexes(service_api)
    for callback in self.task_callbacks.service_started:
        callback(service_api)

    log_entry = (
        "Service " + service_api.service.name + " with UUID '" + service_api.uuid + "' started."
    )
    self.notify(NotificationType.LOG_EVENT, (log_entry, logging.INFO, False))
on_task_finished(task_api)

Executes Scheduling logic when a Task is finished.

Source code in pfdl_scheduler/scheduler.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def on_task_finished(self, task_api: TaskAPI) -> None:
    """Executes Scheduling logic when a Task is finished."""
    for callback in self.task_callbacks.task_finished:
        callback(task_api)

    order_finished = False
    if task_api.task.name == "productionTask":
        self.running = False
        order_finished = True
        self.petri_net_logic.draw_petri_net()
        self.notify(NotificationType.PETRI_NET, self.scheduler_id)

    log_entry = "Task " + task_api.task.name + " with UUID '" + task_api.uuid + "' finished."
    self.notify(NotificationType.LOG_EVENT, (log_entry, logging.INFO, order_finished))
on_task_started(task_api)

Executes Scheduling logic when a Task is started.

Source code in pfdl_scheduler/scheduler.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def on_task_started(self, task_api: TaskAPI) -> None:
    """Executes Scheduling logic when a Task is started."""
    new_uuid = str(uuid.uuid4())
    if self.generate_test_ids:
        new_uuid = str(self.test_id_counters[0])
        self.test_id_counters[0] = self.test_id_counters[0] + 1

    task_api.uuid = new_uuid
    if task_api.task_call:
        task_api.input_parameters = copy.deepcopy(task_api.task_call.input_parameters)

    self.substitute_loop_indexes(task_api)
    for callback in self.task_callbacks.task_started:
        callback(task_api)

    log_entry = "Task " + task_api.task.name + " with UUID '" + task_api.uuid + "' started."
    self.notify(NotificationType.LOG_EVENT, (log_entry, logging.INFO, False))
on_while_loop_started(loop, then_uuid, else_uuid, task_context)

Executes Scheduling logic when a While Loop is started.

Source code in pfdl_scheduler/scheduler.py
353
354
355
356
357
358
359
360
361
362
363
364
def on_while_loop_started(
    self, loop: WhileLoop, then_uuid: str, else_uuid: str, task_context: TaskAPI
) -> None:
    """Executes Scheduling logic when a While Loop is started."""
    if self.check_expression(loop.expression, task_context):
        awaited_event = Event(event_type=SET_PLACE, data={"place_id": then_uuid})
        self.awaited_events.append(awaited_event)
        self.fire_event(awaited_event)
    else:
        awaited_event = Event(event_type=SET_PLACE, data={"place_id": else_uuid})
        self.awaited_events.append(awaited_event)
        self.fire_event(awaited_event)
register_callback_service_finished(callback)

Registers the given callback in the service_finished list.

Parameters:

Name Type Description Default
callback Callable[[ServiceAPI], Any]

Function which will be invoked when a Service is finished.

required

Returns:

Type Description
bool

True if the callback was successfully registered.

Source code in pfdl_scheduler/scheduler.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def register_callback_service_finished(self, callback: Callable[[ServiceAPI], Any]) -> bool:
    """Registers the given callback in the service_finished list.

    Args:
        callback: Function which will be invoked when a Service is finished.

    Returns:
        True if the callback was successfully registered.
    """
    if callback not in self.task_callbacks.service_finished:
        self.task_callbacks.service_finished.append(callback)
        return True

    print("The given Callback function is already registered!")
    return False
register_callback_service_started(callback)

Registers the given callback in the service_started list.

Parameters:

Name Type Description Default
callback Callable[[ServiceAPI], Any]

Function which will be invoked when a Service is started.

required

Returns:

Type Description
bool

True if the callback was successfully registered.

Source code in pfdl_scheduler/scheduler.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def register_callback_service_started(self, callback: Callable[[ServiceAPI], Any]) -> bool:
    """Registers the given callback in the service_started list.

    Args:
        callback: Function which will be invoked when a Service is started.

    Returns:
        True if the callback was successfully registered.
    """
    if callback not in self.task_callbacks.service_started:
        self.task_callbacks.service_started.append(callback)
        return True

    print("The given Callback function is already registered!")
    return False
register_callback_task_finished(callback)

Registers the given callback in the task_finished list.

Parameters:

Name Type Description Default
callback Callable[[TaskAPI], Any]

Function which will be invoked when a Task is finished.

required

Returns:

Type Description
bool

True if the callback was successfully registered.

Source code in pfdl_scheduler/scheduler.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def register_callback_task_finished(self, callback: Callable[[TaskAPI], Any]) -> bool:
    """Registers the given callback in the task_finished list.

    Args:
        callback: Function which will be invoked when a Task is finished.

    Returns:
        True if the callback was successfully registered.
    """
    if callback not in self.task_callbacks.task_finished:
        self.task_callbacks.task_finished.append(callback)
        return True

    print("The given Callback function is already registered!")
    return False
register_callback_task_started(callback)

Registers the given callback in the task_started list.

Parameters:

Name Type Description Default
callback Callable[[TaskAPI], Any]

Function which will be invoked when a Task is started.

required

Returns:

Type Description
bool

True if the callback was successfully registered.

Source code in pfdl_scheduler/scheduler.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def register_callback_task_started(self, callback: Callable[[TaskAPI], Any]) -> bool:
    """Registers the given callback in the task_started list.

    Args:
        callback: Function which will be invoked when a Task is started.

    Returns:
        True if the callback was successfully registered.
    """
    if callback not in self.task_callbacks.task_started:
        self.task_callbacks.task_started.append(callback)
        return True

    print("The given Callback function is already registered!")
    return False
register_for_petrinet_callbacks()

Register scheduler callback functions in the petri net.

Source code in pfdl_scheduler/scheduler.py
460
461
462
463
464
465
466
467
468
469
470
def register_for_petrinet_callbacks(self) -> None:
    """Register scheduler callback functions in the petri net."""
    callbacks = self.petri_net_generator.callbacks
    callbacks.task_started = self.on_task_started
    callbacks.service_started = self.on_service_started
    callbacks.service_finished = self.on_service_finished
    callbacks.condition_started = self.on_condition_started
    callbacks.while_loop_started = self.on_while_loop_started
    callbacks.counting_loop_started = self.on_counting_loop_started
    callbacks.parallel_loop_started = self.on_parallel_loop_started
    callbacks.task_finished = self.on_task_finished
register_variable_access_function(var_access_func)

Registers the given callback as the variable acces function of the Scheduler.

Parameters:

Name Type Description Default
var_access_func Callable[[str], str]

The function which will be called when the scheduler needs a variable.

required
Source code in pfdl_scheduler/scheduler.py
251
252
253
254
255
256
257
def register_variable_access_function(self, var_access_func: Callable[[str], str]) -> None:
    """Registers the given callback as the variable acces function of the Scheduler.

    Args:
        var_access_func: The function which will be called when the scheduler needs a variable.
    """
    self.variable_access_function = var_access_func
start()

Starts the scheduling process for the given PFDL file from the path.

Returns:

Type Description
bool

True if the corresponding PFDL file was valid and the Scheduler could be started.

Source code in pfdl_scheduler/scheduler.py
155
156
157
158
159
160
161
162
163
164
165
def start(self) -> bool:
    """Starts the scheduling process for the given PFDL file from the path.

    Returns:
        True if the corresponding PFDL file was valid and the Scheduler could be started.
    """
    if self.pfdl_file_valid:
        self.fire_event(Event(event_type=START_PRODUCTION_TASK, data={}))
        self.running = True
        return True
    return False
substitute_loop_indexes(call_api)

Substitutes loop indexes in service or task call input parameters if present.

Source code in pfdl_scheduler/scheduler.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def substitute_loop_indexes(self, call_api: Union[ServiceAPI, TaskAPI]) -> None:
    """Substitutes loop indexes in service or task call input parameters if present."""
    if call_api.task_context:
        task_uuid = call_api.task_context.uuid
        if task_uuid in self.loop_counters:
            current_loop_counters = self.loop_counters[task_uuid]

            counter_was_raised = {}
            for i, input_parameter in enumerate(call_api.input_parameters):
                if isinstance(input_parameter, List):
                    for j, element in enumerate(input_parameter):
                        counting_variable = element.replace("[", "").replace("]", "")
                        if counting_variable in current_loop_counters:
                            value = current_loop_counters[counting_variable]
                            if isinstance(value, ParallelLoopCounter):
                                if counting_variable not in counter_was_raised:
                                    current_loop_counters[counting_variable].value += 1
                                    counter_was_raised[counting_variable] = True
                                value = current_loop_counters[counting_variable].value
                            # substitute the counting variable with the value of the ith iteration
                            call_api.input_parameters[i][j] = "[" + str(value) + "]"

scheduling

event

Contains the Event class.

Event(event_type='', data=None)

Data class for controlling the PetriNet instance.

Currently avaiable Events
  • Event(event_type="service_finished", data={"service_id": })

Attributes:

Name Type Description
event_type str

A string representing the type of the event.

data Dict

A dict containing the corresponding data of the event type.

Source code in pfdl_scheduler/scheduling/event.py
32
33
34
def __init__(self, event_type: str = "", data: Dict = None) -> None:
    self.event_type: str = event_type
    self.data: Dict = data
from_json(json_string) classmethod

Creates an Event instance out of the given JSON string.

Parameters:

Name Type Description Default
json_string str

A JSON string desribing the Event.

required

Returns:

Type Description
Union[None, Event]

The Event which was created from the JSON string. None if the conversion failed.

Source code in pfdl_scheduler/scheduling/event.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@classmethod
def from_json(cls, json_string: str) -> Union[None, "Event"]:
    """Creates an Event instance out of the given JSON string.

    Args:
        json_string: A JSON string desribing the Event.

    Returns:
        The Event which was created from the JSON string. None if the conversion failed.
    """
    json_dict = json.loads(json_string)
    if "event_type" in json_dict and "data" in json_dict:
        return Event(event_type=json_dict["event_type"], data=json_dict["data"])
    else:
        return None

task_callbacks

Contains the TaskCallbacks class.

TaskCallbacks() dataclass

Contains lists of callback functions that where registered in the scheduler.

Attributes:

Name Type Description
task_started List[Callable[[TaskAPI], Any]]

A list of callback functions which get called when a task is started.

service_started List[Callable[[ServiceAPI], Any]]

A list of callback functions which get called when a service is started.

service_finished List[Callable[[ServiceAPI], Any]]

A list of callback functions which get called when a service is finished.

task_finished List[Callable[[TaskAPI], Any]]

A list of callback functions which get called when a task is finished.

Initialize the object.

Source code in pfdl_scheduler/scheduling/task_callbacks.py
29
30
31
32
33
34
def __init__(self):
    """Initialize the object."""
    self.task_started: List[Callable[[TaskAPI], Any]] = []
    self.service_started: List[Callable[[ServiceAPI], Any]] = []
    self.service_finished: List[Callable[[ServiceAPI], Any]] = []
    self.task_finished: List[Callable[[TaskAPI], Any]] = []

utils

dashboard_observer

Contains the start up script for the dashboard.

A program executed in the VS Code extension which has a string containing a PFDL program as input.

DashboardObserver(host, scheduler_id, pfdl_string)

Bases: Observer

DashboardObserver for receiving infos about changes of the PetriNet or Scheduling.

The Observer will send a post request to the dashboard with the data.

Source code in pfdl_scheduler/utils/dashboard_observer.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(self, host: str, scheduler_id: str, pfdl_string: str) -> None:
    self.host: str = host
    self.scheduler_id: str = scheduler_id
    current_timestamp: int = int(round(datetime.timestamp(datetime.now())))
    self.starting_date: int = current_timestamp
    self.pfdl_string: str = pfdl_string
    self.order_finished: bool = False

    threading.Thread(target=send_post_requests, daemon=True).start()

    request_data = {
        "order_id": scheduler_id,
        "starting_date": current_timestamp,
        "last_update": current_timestamp,
        "status": ORDER_STARTED,
        "pfdl_string": self.pfdl_string,
    }

    message_queue.put((self.host + ORDER_ROUTE, request_data))

helpers

Helper functions used in the project (especially in the SemanticErrorChecker).

check_type_of_value(value, value_type)

Checks if the given value is the given type in the DSL.

Returns:

Type Description
bool

True if the value is from the given value type.

Source code in pfdl_scheduler/utils/helpers.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def check_type_of_value(value: Any, value_type: str) -> bool:
    """Checks if the given value is the given type in the DSL.

    Returns:
        True if the value is from the given value type.
    """
    if value_type == "number":
        # bool is a subclass of int so check it before
        if isinstance(value, bool):
            return False
        return isinstance(value, (int, float))
    if value_type == "boolean":
        return isinstance(value, bool)
    if value_type == "string":
        return isinstance(value, str)
    if isinstance(value, Struct):
        return value.name == value_type
    # value was a string
    return True
get_type_of_variable_list(var_list, task, struct_definitions)

Iterates over the given variable list and gets the type of the last element.

Returns:

Type Description
str

Type of the last element in the variable list as string.

Source code in pfdl_scheduler/utils/helpers.py
39
40
41
42
43
44
45
46
47
48
49
50
51
def get_type_of_variable_list(
    var_list: List[str], task: Task, struct_definitions: Dict[str, Struct]
) -> str:
    """Iterates over the given variable list and gets the type of the last element.

    Returns:
        Type of the last element in the variable list as string.
    """
    current_struct = struct_definitions[task.variables[var_list[0]]]
    for i in range(1, len(var_list) - 1):
        current_struct = struct_definitions[current_struct.attributes[var_list[i]]]
    variable_type = current_struct.attributes[var_list[len(var_list) - 1]]
    return variable_type
is_boolean(string)

Checks if the given string can be casted to a boolean.

Returns:

Type Description
bool

True if the given string can be casted to a boolean.

Source code in pfdl_scheduler/utils/helpers.py
76
77
78
79
80
81
82
83
84
def is_boolean(string: str) -> bool:
    """Checks if the given string can be casted to a boolean.

    Returns:
        True if the given string can be casted to a boolean.
    """
    if string in ("true", "false"):
        return True
    return False
is_con(string)

Checks if the given string is a condition element in the PFDL.

A condition element can be a PFDL string, boolean or number

Returns:

Type Description
bool

True if the given string is a condition element in the PFDL.

Source code in pfdl_scheduler/utils/helpers.py
54
55
56
57
58
59
60
61
62
def is_con(string: str) -> bool:
    """Checks if the given string is a condition element in the PFDL.

    A condition element can be a PFDL string, boolean or number

    Returns:
        True if the given string is a condition element in the PFDL.
    """
    return is_string(string) or is_boolean(string) or is_number(string)
is_float(string)

Checks if the given string can be casted to a float.

Returns:

Type Description
bool

True if the given string can be casted to a float.

Source code in pfdl_scheduler/utils/helpers.py
 98
 99
100
101
102
103
104
105
106
107
108
109
def is_float(string: str) -> bool:
    """Checks if the given string can be casted to a float.

    Returns:
        True if the given string can be casted to a float.
    """
    try:
        float(string)
    except (TypeError, ValueError):
        return False
    else:
        return True
is_int(string)

Checks if the given string can be casted to an integer.

Returns:

Type Description
bool

True if the given string can be casted to an integer.

Source code in pfdl_scheduler/utils/helpers.py
112
113
114
115
116
117
118
119
120
121
122
123
def is_int(string: str) -> bool:
    """Checks if the given string can be casted to an integer.

    Returns:
        True if the given string can be casted to an integer.
    """
    try:
        int(string)
    except (TypeError, ValueError):
        return False
    else:
        return True
is_number(string)

Checks if the given string can be casted to a number (int or float).

Returns:

Type Description
bool

True if the given string can be casted to a number.

Source code in pfdl_scheduler/utils/helpers.py
87
88
89
90
91
92
93
94
95
def is_number(string: str) -> bool:
    """Checks if the given string can be casted to a number (int or float).

    Returns:
        True if the given string can be casted to a number.
    """
    if is_float(string) or is_int(string):
        return True
    return False
is_string(string)

Check if the given parameter is a string in the DSL: It should start and end with '"'.

Returns:

Type Description
bool

True if the given string is a string in the DSL.

Source code in pfdl_scheduler/utils/helpers.py
65
66
67
68
69
70
71
72
73
def is_string(string: str) -> bool:
    """Check if the given parameter is a string in the DSL: It should start and end with '"'.

    Returns:
        True if the given string is a string in the DSL.
    """
    if isinstance(string, str) and string.startswith('"') and string.endswith('"'):
        return True
    return False
parse_operator(op)

Parses a PFDL operator in form of a string into a Python executable operator func.

Source code in pfdl_scheduler/utils/helpers.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def parse_operator(op: str) -> operator:
    """Parses a PFDL operator in form of a string into a Python executable operator func."""
    ops = {
        ">": operator.gt,
        ">=": operator.ge,
        "<": operator.lt,
        "<=": operator.le,
        "==": operator.eq,
        "!=": operator.ne,
        "And": operator.and_,
        "Or": operator.or_,
        "+": operator.add,
        "-": operator.sub,
        "*": operator.mul,
        "/": operator.truediv,
    }
    return ops[op]

log_entry_observer

This module provides the LogEntryObserver which implements the Observer pattern.

The scheduler notifies about log entries, so this class is used to catch these updates and log them into a file

LogEntryObserver(scheduler_id)

Bases: Observer

LogEntryObserver for receiving logging information from the Scheduler.

LogLevels are based of https://docs.python.org/3/library/logging.html#logging-levels

Source code in pfdl_scheduler/utils/log_entry_observer.py
39
40
41
42
43
44
45
def __init__(self, scheduler_id: str):
    logging.basicConfig(
        filename=LOG_FILE_LOCATION + scheduler_id + LOG_FILE_FORMAT,
        encoding=LOG_FILE_ENCODING,
        level=logging.DEBUG,
        filemode=LOG_FILE_FILEMODE,
    )

parsing_utils

Contains functions which are used to load and parse PFDL files.

load_file(file_path)

Loads the content of the file from the given path.

Returns:

Type Description
str

The content of the file as a string.

Source code in pfdl_scheduler/utils/parsing_utils.py
 96
 97
 98
 99
100
101
102
103
104
105
def load_file(file_path: str) -> str:
    """Loads the content of the file from the given path.

    Returns:
        The content of the file as a string.
    """
    pfdl_string = ""
    with open(file_path, "r", encoding="utf-8") as file:
        pfdl_string = file.read()
    return pfdl_string
parse_file(file_path)

Loads the content of the file from the given path and calls the parse_string function.

Parameters:

Name Type Description Default
file_path str

The path to the PFDL file.

required

Returns:

Type Description
bool

A boolan indicating validity of the PFDL file, the content of the file, and the

Union[None, Process]

process object if so, otherwise None.

Source code in pfdl_scheduler/utils/parsing_utils.py
70
71
72
73
74
75
76
77
78
79
80
81
def parse_file(file_path: str) -> Tuple[bool, Union[None, Process], str]:
    """Loads the content of the file from the given path and calls the parse_string function.

    Args:
        file_path: The path to the PFDL file.

    Returns:
        A boolan indicating validity of the PFDL file, the content of the file, and the
        process object if so, otherwise None.
    """
    pfdl_string = load_file(file_path)
    return *parse_string(pfdl_string, file_path), pfdl_string
parse_string(pfdl_string, file_path='', used_in_extension=False)

Instantiate the ANTLR lexer and parser and parses the given PFDL string.

Parameters:

Name Type Description Default
pfdl_string str

A string containing the content of a PFDL file.

required
file_path str

The path of the PFDL file (used for error messages).

''
used_in_extension bool

A boolean indicating if the function is called from the extension.

False

Returns:

Type Description
Tuple[bool, Union[None, Process]]

A boolan indicating validity of the PFDL file and the process object if so, otherwise None.

Source code in pfdl_scheduler/utils/parsing_utils.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def parse_string(
    pfdl_string: str, file_path: str = "", used_in_extension: bool = False
) -> Tuple[bool, Union[None, Process]]:
    """Instantiate the ANTLR lexer and parser and parses the given PFDL string.

    Arguments:
        pfdl_string: A string containing the content of a PFDL file.
        file_path: The path of the PFDL file (used for error messages).
        used_in_extension: A boolean indicating if the function is called from the extension.

    Returns:
        A boolan indicating validity of the PFDL file and the process object if so, otherwise None.
    """
    lexer = PFDLLexer(InputStream(pfdl_string))
    lexer.removeErrorListeners()

    token_stream = CommonTokenStream(lexer)

    parser = PFDLParser(token_stream)
    parser.removeErrorListeners()

    error_handler = ErrorHandler(file_path, used_in_extension)
    error_listener = SyntaxErrorListener(token_stream, error_handler)
    parser.addErrorListener(error_listener)

    tree = parser.program()

    if error_handler.has_error() is False:
        visitor = PFDLTreeVisitor(error_handler)
        process = visitor.visit(tree)

        semantic_error_checker = SemanticErrorChecker(error_handler, process)
        semantic_error_checker.validate_process()

        if error_handler.has_error() is False:
            return (True, process)
        return (False, process)
    return (False, None)
write_tokens_to_file(token_stream)

Writes the given ANTLR CommonTokenStream into a file named 'token.txt'.

Source code in pfdl_scheduler/utils/parsing_utils.py
84
85
86
87
88
89
90
91
92
93
def write_tokens_to_file(token_stream: CommonTokenStream) -> None:
    """Writes the given ANTLR CommonTokenStream into a file named 'token.txt'."""
    Path("./temp").mkdir(parents=True, exist_ok=True)
    with open("temp/token.txt", "w", encoding="utf-8") as file:
        pattern = re.compile("\r?\n.*")
        for token in token_stream.tokens:
            token_text = token.text
            if re.match(pattern, token.text):
                token_text = "NL"
            file.write(token_text + "\n")

validation

error_handler

Contains the ErrorHandler class.

ErrorHandler(file_path, used_in_extension)

Keeps track of the total error amount in an PFDL file.

Provides a method for printing an erro which counts the errors.

Attributes:

Name Type Description
total_error_count int

Total number of errors.

syntax_error_count int

Number of syntax errors.

semantic_error_count int

Number of static semantic errors.

file_path str

The file path to the PFDL file.

used_in_extension bool

A boolean indicating if the Generator is used within the extension.

Initialize the object.

Parameters:

Name Type Description Default
file_path str

The file path to the PFDL file.

required
used_in_extension bool

A boolean indicating if the Generator is used within the extension.

required
Source code in pfdl_scheduler/validation/error_handler.py
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, file_path: str, used_in_extension: bool) -> None:
    """Initialize the object.

    Args:
        file_path: The file path to the PFDL file.
        used_in_extension: A boolean indicating if the Generator is used within the extension.
    """
    self.total_error_count: int = 0
    self.syntax_error_count: int = 0
    self.semantic_error_count: int = 0
    self.file_path: str = file_path
    self.used_in_extension: bool = used_in_extension
has_error()

Returns true if the total error_count is greater than zero.

Source code in pfdl_scheduler/validation/error_handler.py
78
79
80
def has_error(self) -> bool:
    """Returns true if the total error_count is greater than zero."""
    return self.total_error_count > 0
print_error(error_msg, line=0, column=0, off_symbol_length=0, context=None, syntax_error=False)

Prints an error into the standard output.

Parameters:

Name Type Description Default
error_msg str

A string containing the error message

required
line int

The line in which the error occured

0
column int

The column in which the error occured

0
off_symbol_length int

Length of the offending symbol

0
context ParserRuleContext

ANTLR Context object (lines and column will be used from this if not None)

None
syntax_error bool

A boolean indicating whether the error is a syntax error or not.

False
Source code in pfdl_scheduler/validation/error_handler.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def print_error(
    self,
    error_msg: str,
    line: int = 0,
    column: int = 0,
    off_symbol_length: int = 0,
    context: ParserRuleContext = None,
    syntax_error: bool = False,
) -> None:
    """Prints an error into the standard output.

    Args:
        error_msg: A string containing the error message
        line: The line in which the error occured
        column: The column in which the error occured
        off_symbol_length: Length of the offending symbol
        context: ANTLR Context object (lines and column will be used from this if not None)
        syntax_error: A boolean indicating whether the error is a syntax error or not.
    """
    if context:
        line = context.start.line
        column = context.start.column - 1
        off_symbol_length = len(context.start.text)
    if self.used_in_extension:
        print(error_msg)
        print(line)
        print(column + 1)  # for ext: antlr starts at column 0, LSP at column 1
        print(off_symbol_length)
    else:
        print(error_msg)
        print("File " + self.file_path + ", in line " + str(line) + ":" + str(column))

    if syntax_error:
        self.syntax_error_count = self.syntax_error_count + 1
    else:
        self.semantic_error_count = self.semantic_error_count + 1

    self.total_error_count = self.total_error_count + 1

semantic_error_checker

Contains the SemanticErrorChecker class.

SemanticErrorChecker(error_handler, process)

Checks for static semantic errors in a given Process object.

Each of the methods in this class checks for a specific semantic error. After calling the validate_process method the entire given process object and all the model objects it contains are tested for errors.

Attributes:

Name Type Description
error_handler ErrorHandler

ErrorHandler instance for printing errors.

process Process

The Process object which has to be validated.

tasks Dict[str, Task]

A Dict that contains all Task objects of the given process object.

structs Dict[str, Struct]

A Dict that contains all Struct objects of the given process object.

Initialize the object.

Parameters:

Name Type Description Default
error_handler ErrorHandler

ErrorHandler instance for printing errors.

required
process Process

The Process object which has to be validated.

required
Source code in pfdl_scheduler/validation/semantic_error_checker.py
49
50
51
52
53
54
55
56
57
58
59
def __init__(self, error_handler: ErrorHandler, process: Process) -> None:
    """Initialize the object.

    Args:
        error_handler: ErrorHandler instance for printing errors.
        process: The Process object which has to be validated.
    """
    self.error_handler: ErrorHandler = error_handler
    self.process: Process = process
    self.tasks: Dict[str, Task] = process.tasks
    self.structs: Dict[str, Struct] = process.structs
check_array(instantiated_array, array_definition)

Calls check methods to validate the instantiated array.

Returns:

Type Description
bool

True if the given array is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
def check_array(self, instantiated_array: Array, array_definition: Array) -> bool:
    """Calls check methods to validate the instantiated array.

    Returns:
        True if the given array is valid.
    """
    error_msg = ""
    element_type = array_definition.type_of_elements
    for value in instantiated_array.values:
        # type of Struct not checked yet
        if isinstance(value, Struct):
            if value.name == "":
                value.name = array_definition.type_of_elements
            if not self.check_instantiated_struct_attributes(value):
                return False
        if not helpers.check_type_of_value(value, element_type):
            error_msg = (
                f"Array has elements that does not match "
                f"with the defined type '{element_type}'"
            )
            self.error_handler.print_error(error_msg, context=instantiated_array.context)
            return False

    if not self.instantiated_array_length_correct(instantiated_array, array_definition):
        error_msg = "Length of the defined array and the instantiated do not match"
        self.error_handler.print_error(error_msg, context=instantiated_array.context)
        return False

    return True
check_attribute_access(variable_list, context, task)

Checks if the attribute access via a variable list (for example x.y.z) is valid.

Returns:

Type Description
bool

True if the attribute access is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def check_attribute_access(
    self, variable_list: List[str], context: ParserRuleContext, task: Task
) -> bool:
    """Checks if the attribute access via a variable list (for example x.y.z) is valid.

    Returns:
        True if the attribute access is valid.
    """
    variable = variable_list[0]
    if variable in task.variables and task.variables[variable] in self.structs:
        struct = self.structs[task.variables[variable]]
        predecessor = struct
        for i in range(1, len(variable_list)):
            attribute = variable_list[i]
            if not (attribute.startswith("[") and attribute.endswith("]")):
                if attribute not in predecessor.attributes:
                    error_msg = f"Struct '{predecessor.name}' has no attribute '{attribute}'"
                    self.error_handler.print_error(error_msg, context=context)
                    return False
                if i < len(variable_list) - 1:
                    # check if this attribute is an array (next element is [])
                    if not (
                        variable_list[i + 1].startswith("[")
                        and variable_list[i + 1].endswith("]")
                    ):
                        if predecessor.attributes[attribute] not in self.structs:
                            error_msg = f"Attribute '{attribute}' is not a Struct"
                            self.error_handler.print_error(error_msg, context=context)
                            return False
                        predecessor = self.structs[predecessor.attributes[attribute]]
                    else:
                        predecessor = self.structs[
                            predecessor.attributes[attribute].type_of_elements
                        ]
    else:
        error_msg = f"Unknown variable '{variable}'."
        self.error_handler.print_error(error_msg, context=context)
        return False
    return True
check_binary_operation(expression, context, task)

Checks if a binary expression is a valid expression.

Returns:

Type Description
bool

True if the given binary expression is a valid expression.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
def check_binary_operation(self, expression, context: ParserRuleContext, task: Task) -> bool:
    """Checks if a binary expression is a valid expression.

    Returns:
        True if the given binary expression is a valid expression.
    """
    left = expression["left"]
    right = expression["right"]

    if expression["binOp"] in ["<", ">", "<=", ">="]:
        # Check if left and right side represent numbers or strings
        if self.expression_is_number(left, task) and self.expression_is_number(right, task):
            return True
        if self.expression_is_string(left, task) and self.expression_is_string(right, task):
            return True

        msg = "Types of Right and left side of the comparison dont match"
        self.error_handler.print_error(msg, context=context)
        return False
    if expression["binOp"] in ["*", "/", "+", "-"]:
        if self.expression_is_number(left, task) and self.expression_is_number(right, task):
            return True
        msg = "Right and left side have to be numbers when using arithmetic operators"
        self.error_handler.print_error(msg, context=context)
        return False
    if left == "(" and right == ")":
        return self.check_expression(expression["binOp"], context, task)

    # expression is either 'and', 'or', '==' or '!='
    return self.check_expression(left, context, task) and self.check_expression(
        right, context, task
    )
check_call_input_parameters(called_entity, task)

Checks if the input parameters of a Service or Task Call are valid.

Parameters:

Name Type Description Default
called_entity Union[Service, TaskCall]

The evoked call (service or task call).

required

Returns:

Type Description
bool

True if the input parameters of a Service or Task Call are valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def check_call_input_parameters(
    self, called_entity: Union[Service, TaskCall], task: Task
) -> bool:
    """Checks if the input parameters of a Service or Task Call are valid.

    Args:
        called_entity: The evoked call (service or task call).

    Returns:
        True if the input parameters of a Service or Task Call are valid.
    """
    valid = True

    for input_parameter in called_entity.input_parameters:
        if isinstance(input_parameter, Struct):
            if not self.check_instantiated_struct_attributes(input_parameter):
                valid = False
        elif isinstance(input_parameter, list):
            if not self.check_attribute_access(
                input_parameter, called_entity.context_dict[IN_KEY], task
            ):
                valid = False
        elif not input_parameter in task.variables:
            error_msg = (
                f"An unknown variable '{input_parameter}' is used as input of"
                f" {type(called_entity).__name__} '{called_entity.name}'"
            )
            self.error_handler.print_error(error_msg, context=called_entity.context)
            valid = False
    return valid
check_call_output_parameters(called_entity)

Checks if the output parameters of a Service or Task Call are valid.

The output parameter of a call only consists of the visible variable name and the type of the variable. So all there is to check is the variable definition. This methods just calls the check_if_variable_definition_is_valid method for all output parameters.

Parameters:

Name Type Description Default
called_entity Union[Service, TaskCall]

The evoked call (service or task call).

required

Returns:

Type Description
bool

True if the output parameters of a Service or Task Call are valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def check_call_output_parameters(self, called_entity: Union[Service, TaskCall]) -> bool:
    """Checks if the output parameters of a Service or Task Call are valid.

    The output parameter of a call only consists of the visible variable name and
    the type of the variable. So all there is to check is the variable definition.
    This methods just calls the `check_if_variable_definition_is_valid` method for all
    output parameters.

    Args:
        called_entity: The evoked call (service or task call).

    Returns:
        True if the output parameters of a Service or Task Call are valid.
    """
    valid = True
    for identifier, variable_type in called_entity.output_parameters.items():
        if not self.check_if_variable_definition_is_valid(
            identifier, variable_type, called_entity.context
        ):
            valid = False
    return valid
check_call_parameters(called_entity, task)

Checks if the parameters of a Service or Task call are valid (input and output).

Parameters:

Name Type Description Default
called_entity Union[Service, TaskCall]

The evoked call (service or task call).

required

Returns:

Type Description
bool

True if the parameters of a Service or Task call are valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def check_call_parameters(self, called_entity: Union[Service, TaskCall], task: Task) -> bool:
    """Checks if the parameters of a Service or Task call are valid (input and output).

    Args:
        called_entity: The evoked call (service or task call).

    Returns:
        True if the parameters of a Service or Task call are valid.
    """
    valid = True
    if called_entity.input_parameters:
        valid = self.check_call_input_parameters(called_entity, task)
    if called_entity.output_parameters:
        valid = valid & self.check_call_output_parameters(called_entity)

    return valid
check_conditional_statement(condition, task)

Calls check methods for the conditional statement.

Calls check_statement for the Passed and Failed statements and checks if the boolean expression is valid.

Returns:

Type Description
bool

True if the conditional statement is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
def check_conditional_statement(self, condition: Condition, task: Task) -> bool:
    """Calls check methods for the conditional statement.

    Calls check_statement for the Passed and Failed statements and checks if the
    boolean expression is valid.

    Returns:
        True if the conditional statement is valid.
    """
    valid = True
    for statement in condition.passed_stmts:
        if not self.check_statement(statement, task):
            valid = False

    for statement in condition.failed_stmts:
        if not self.check_statement(statement, task):
            valid = False

    if not self.check_expression(condition.expression, condition.context, task):
        valid = False
    return valid
check_counting_loop(counting_loop, task)

Calls check methods for the Counting Loop statement.

Returns:

Type Description
bool

True if the Counting Loop statement is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
707
708
709
710
711
712
713
714
715
716
717
718
def check_counting_loop(self, counting_loop: CountingLoop, task: Task) -> bool:
    """Calls check methods for the Counting Loop statement.

    Returns:
        True if the Counting Loop statement is valid.
    """
    valid = True
    for statement in counting_loop.statements:
        if not self.check_statement(statement, task):
            valid = False

    return valid
check_expression(expression, context, task)

Executes checks to test given expression.

Returns:

Type Description
bool

True if the given expression is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
def check_expression(
    self, expression: Union[str, dict], context: ParserRuleContext, task: Task
) -> bool:
    """Executes checks to test given expression.

    Returns:
        True if the given expression is valid.
    """
    if isinstance(expression, (list, str)):
        if not self.check_single_expression(expression, context, task):
            return False
    elif isinstance(expression, dict):
        if len(expression) == 2:
            if not self.check_unary_operation(expression, context, task):
                return False
        else:
            if not self.check_binary_operation(expression, context, task):
                return False
    return True
check_for_missing_attribute_in_struct(struct, struct_definition)

Checks if an instantiated Struct is missing an attribute from the Struct definition.

Returns:

Type Description
bool

True if no attributes are missing.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
def check_for_missing_attribute_in_struct(
    self, struct: Struct, struct_definition: Struct
) -> bool:
    """Checks if an instantiated Struct is missing an attribute from the Struct definition.

    Returns:
        True if no attributes are missing.
    """
    for attribute in struct_definition.attributes:
        if attribute not in struct.attributes:
            error_msg = (
                f"Attribute '{attribute}' is not defined in "
                f"the instantiated struct '{struct.name}'"
            )
            self.error_handler.print_error(error_msg, context=struct.context)
            return False
    return True
check_for_unknown_attribute_in_struct(struct_instance, identifier, struct_definition)

Checks if the given identifier in the instantiated struct exists in the struct definition.

Parameters:

Name Type Description Default
struct_instance Struct

The instantiated struct that is checked.

required
identifier str

The identifier in the struct instance which should be checked.

required
struct_definition Struct

The corresponding struct definition.

required

Returns:

Type Description
bool

True if the given identifier exists in the struct definition.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
def check_for_unknown_attribute_in_struct(
    self, struct_instance: Struct, identifier: str, struct_definition: Struct
) -> bool:
    """Checks if the given identifier in the instantiated struct
    exists in the struct definition.

    Args:
        struct_instance: The instantiated struct that is checked.
        identifier: The identifier in the struct instance which should be checked.
        struct_definition: The corresponding struct definition.

    Returns:
        True if the given identifier exists in the struct definition.
    """
    if identifier not in struct_definition.attributes:
        error_msg = (
            f"Unknown attribute '{identifier}' in instantiated "
            f"struct '{struct_definition.name}'"
        )
        self.error_handler.print_error(error_msg, context=struct_instance.context)
        return False
    return True
check_for_unknown_datatypes_in_struct_definition(struct)

Checks for each attribute definition in the struct if the defined type exists.

Returns:

Type Description
bool

True if all datatypes in the given struct are known.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
83
84
85
86
87
88
89
90
91
92
93
94
95
def check_for_unknown_datatypes_in_struct_definition(self, struct: Struct) -> bool:
    """Checks for each attribute definition in the struct if the defined type exists.

    Returns:
        True if all datatypes in the given struct are known.
    """
    datatypes_known = True
    for identifier, attribute_type in struct.attributes.items():
        if not self.check_if_variable_definition_is_valid(
            identifier, attribute_type, struct.context
        ):
            datatypes_known = False
    return datatypes_known
check_for_wrong_attribute_type_in_struct(struct_instance, identifier, struct_definition)

Calls check methods for the attribute assignments in an instantiated Struct.

This methods assumes that the given identifier is not unknown. Args: struct_instance: The instantiated struct that is checked. identifier: The identifier in the struct instance which should be checked. struct_definition: The corresponding struct definition.

Returns:

Type Description
bool

True if the given identifier in the struct instance matches with the struct definition.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
def check_for_wrong_attribute_type_in_struct(
    self, struct_instance: Struct, identifier: str, struct_definition: Struct
) -> bool:
    """Calls check methods for the attribute assignments in an instantiated Struct.

    This methods assumes that the given identifier is not unknown.
    Args:
        struct_instance: The instantiated struct that is checked.
        identifier: The identifier in the struct instance which should be checked.
        struct_definition: The corresponding struct definition.

    Returns:
        True if the given identifier in the struct instance matches with the struct definition.
    """
    correct_attribute_type = struct_definition.attributes[identifier]
    attribute = struct_instance.attributes[identifier]

    if isinstance(correct_attribute_type, str):
        if correct_attribute_type in self.structs:

            # check for structs which has structs as attribute
            if isinstance(attribute, Struct):
                attribute.name = correct_attribute_type
                struct_def = self.structs[correct_attribute_type]
                struct_correct = True
                for identifier in attribute.attributes:
                    if not self.check_for_wrong_attribute_type_in_struct(
                        attribute, identifier, struct_def
                    ):
                        struct_correct = False
                return struct_correct
            error_msg = (
                f"Attribute '{identifier}' has the wrong type in the "
                f"instantiated Struct '{struct_instance.name}', expected "
                f"Struct '{correct_attribute_type}'"
            )
            self.error_handler.print_error(error_msg, context=struct_instance.context)
            return False
        if not helpers.check_type_of_value(attribute, correct_attribute_type):
            error_msg = (
                f"Attribute '{identifier}' has the wrong type in the instantiated"
                f" Struct '{struct_instance.name}', expected '{correct_attribute_type}'"
            )
            self.error_handler.print_error(error_msg, context=struct_instance.context)
            return False

    elif isinstance(correct_attribute_type, Array):
        if not isinstance(attribute, Array) or not self.check_array(
            attribute, correct_attribute_type
        ):
            error_msg = (
                f"Attribute '{identifier}' has the wrong type in the instantiated"
                f" Struct '{struct_instance.name}', expected 'Array'"
            )
            self.error_handler.print_error(error_msg, context=struct_instance.context)
            return False
    return True
check_if_input_parameter_matches(input_parameter, identifier, defined_type, task_call, called_task, task_context)

Checks if the input parameters of a Taskcall matches with the called Task.

This method assumes that the validity of the input parameter itself was already checked.

Parameters:

Name Type Description Default
input_parameter Union[str, List[str], Struct]

The input parameter of the TaskCall.

required
identifier str

Parameter name of the input in the called task (only for error message).

required
defined_type Union[str, Array]

Type of the input in the called task.

required
task_call TaskCall

The TaskCall the input parameter is from.

required
called_task Task

The Task the TaskCall is refering to (the called task).

required
task_context Task

The Task in which the TaskCall was evoked.

required

Returns: True if the input parameters of a Taskcall matches with the called Task.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def check_if_input_parameter_matches(
    self,
    input_parameter: Union[str, List[str], Struct],
    identifier: str,
    defined_type: Union[str, Array],
    task_call: TaskCall,
    called_task: Task,
    task_context: Task,
) -> bool:
    """Checks if the input parameters of a Taskcall matches with the called Task.

    This method assumes that the validity of the input parameter itself was already checked.

    Args:
        input_parameter: The input parameter of the TaskCall.
        identifier: Parameter name of the input in the called task (only for error message).
        defined_type: Type of the input in the called task.
        task_call: The TaskCall the input parameter is from.
        called_task: The Task the TaskCall is refering to (the called task).
        task_context: The Task in which the TaskCall was evoked.
    Returns:
        True if the input parameters of a Taskcall matches with the called Task.
    """
    if isinstance(input_parameter, str):
        if input_parameter in task_context.variables:
            type_of_variable = task_context.variables[input_parameter]

            # str() because of possible Arrays as
            # types (we can compare types by converting Array object to string)
            if str(type_of_variable) != str(defined_type):
                error_msg = (
                    f"Type of TaskCall parameter '{input_parameter}' does not match "
                    f"with type '{defined_type}' of Input Parameter '{identifier}' in"
                    f" Task '{called_task.name}'"
                )
                self.error_handler.print_error(
                    error_msg,
                    context=task_call.context,
                    off_symbol_length=len(task_call.name),
                )
                return False
        else:
            error_msg = (
                f"Type of TaskCall parameter '{input_parameter}' does not match with "
                f"type '{defined_type}' of Input Parameter '{identifier}' in Task "
                f"'{called_task.name}'"
            )
            self.error_handler.print_error(
                error_msg,
                context=task_call.context,
                off_symbol_length=len(task_call.name),
            )
            return False
    elif isinstance(input_parameter, list):
        # At this point it is known that the variable chain is valid, so dont check again
        current_struct = self.structs[task_context.variables[input_parameter[0]]]
        i = 1
        while i < len(input_parameter) - 1:
            element = current_struct.attributes[input_parameter[i]]
            if isinstance(element, Array):
                i = i + 1
                current_struct = self.structs[element.type_of_elements]
            else:
                current_struct = self.structs[element]
            i = i + 1

        index = len(input_parameter) - 1
        if input_parameter[index].startswith("["):
            given_type = current_struct.name
        else:
            given_type = current_struct.attributes[input_parameter[index]]
        if given_type != defined_type:
            error_msg = (
                "Type of TaskCall parameter "
                f"'{input_parameter[len(input_parameter)-1]}' does not match with "
                f"type '{defined_type}' of Input Parameter '{identifier}' "
                f"in Task '{called_task.name}'"
            )
            self.error_handler.print_error(
                error_msg,
                context=task_call.context,
                off_symbol_length=len(task_call.name),
            )
            return False
    elif isinstance(input_parameter, Struct):
        if input_parameter.name != defined_type:
            error_msg = (
                f"Type of TaskCall parameter '{input_parameter.name}' does not match "
                f"with type '{defined_type}' of Input Parameter '{identifier}' in "
                f"Task '{called_task.name}'"
            )
            self.error_handler.print_error(
                error_msg,
                context=task_call.context,
                off_symbol_length=len(task_call.name),
            )
            return False
    return True
check_if_struct_exists(struct)

Checks if the given Struct instance refers to a existing Struct definition.

Returns:

Type Description
bool

True if the given Struct instance refers to a existing Struct definition.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
551
552
553
554
555
556
557
558
559
560
561
def check_if_struct_exists(self, struct: Struct) -> bool:
    """Checks if the given Struct instance refers to a existing Struct definition.

    Returns:
        True if the given Struct instance refers to a existing Struct definition.
    """
    if struct.name not in self.structs:
        error_msg = f"Unknown Struct '{struct.name}'"
        self.error_handler.print_error(error_msg, context=struct.context)
        return False
    return True
check_if_task_call_matches_with_called_task(task_call, task)

Checks if the parameters of the Taskcall matches with the parameters of the Task.

This method assumes that the validity of the input parameter itself was already checked.

Multiple Checks are done

(1) Checks if length of parameters of Taskcall and Task match. (2) Checks if types of input parameters match. (3) Checks if types of output parameters match.

Parameters:

Name Type Description Default
task_call TaskCall

The task call to be checked.

required
task_context

The task in which the task call is defined.

required

Returns:

Type Description
bool

True if the parameters matches.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def check_if_task_call_matches_with_called_task(self, task_call: TaskCall, task: Task) -> bool:
    """Checks if the parameters of the Taskcall matches with the parameters of the Task.

    This method assumes that the validity of the input parameter itself was already checked.

    Multiple Checks are done:
        (1) Checks if length of parameters of Taskcall and Task match.
        (2) Checks if types of input parameters match.
        (3) Checks if types of output parameters match.

    Args:
        task_call: The task call to be checked.
        task_context: The task in which the task call is defined.

    Returns:
        True if the parameters matches.
    """
    called_task = self.tasks[task_call.name]
    if not self.check_if_task_call_parameter_length_match(task_call):
        return False

    valid = True
    for i, input_parameter in enumerate(task_call.input_parameters):
        identifier = list(called_task.input_parameters.items())[i][0]
        defined_type = list(called_task.input_parameters.items())[i][1]
        if not self.check_if_input_parameter_matches(
            input_parameter, identifier, defined_type, task_call, called_task, task
        ):
            valid = False

    for i, (identifier, data_type) in enumerate(task_call.output_parameters.items()):
        variable_in_called_task = called_task.output_parameters[i]
        if variable_in_called_task in called_task.variables:
            type_of_variable = called_task.variables[variable_in_called_task]

            if str(type_of_variable) != str(data_type):
                error_msg = (
                    f"Type of TaskCall output parameter at position {str((i+1))} does not "
                    f"match with type '{type_of_variable}' of output parameter "
                    f"'{variable_in_called_task}' in Task '{called_task.name}'"
                )
                self.error_handler.print_error(
                    error_msg,
                    context=task_call.context,
                    off_symbol_length=len(task_call.name),
                )
                valid = False

    return valid
check_if_task_call_parameter_length_match(task_call)

Checks if the length of the Task call parameters match with the called Task.

Returns:

Type Description
bool

True if the parameter lengths matches.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def check_if_task_call_parameter_length_match(self, task_call: TaskCall) -> bool:
    """Checks if the length of the Task call parameters match with the called Task.

    Returns:
        True if the parameter lengths matches.
    """
    called_task = self.tasks[task_call.name]
    if len(called_task.input_parameters) != len(task_call.input_parameters):
        error_msg = "Inputparameter length of Task Call and called Task dont match"
        self.error_handler.print_error(error_msg, context=task_call.context)
        return False
    if len(called_task.output_parameters) != len(task_call.output_parameters):
        error_msg = "Outputparameter length of Task Call and called Task dont match"
        self.error_handler.print_error(error_msg, context=task_call.context)
        return False
    return True
check_if_task_in_taskcall_exists(task_name, context)

Checks if the Task name in the Taskcall belongs to a Task.

Returns:

Type Description
bool

True if the given name belongs to a Task.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
886
887
888
889
890
891
892
893
894
895
896
def check_if_task_in_taskcall_exists(self, task_name: str, context: ParserRuleContext) -> bool:
    """Checks if the Task name in the Taskcall belongs to a Task.

    Returns:
        True if the given name belongs to a Task.
    """
    if task_name not in self.tasks:
        error_msg = "Unknown Task '" + task_name + "'"
        self.error_handler.print_error(error_msg, context=context)
        return False
    return True
check_if_variable_definition_is_valid(identifier, variable_type, context)

Checks if the variable has the correct type.

Returns:

Type Description
bool

True if variable definition is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
def check_if_variable_definition_is_valid(
    self, identifier: str, variable_type: Union[str, Array], context
) -> bool:
    """Checks if the variable has the correct type.

    Returns:
        True if variable definition is valid.
    """
    valid = True
    if isinstance(variable_type, str):
        if not self.variable_type_exists(variable_type):
            valid = False
    elif isinstance(variable_type, Array):
        element_type = variable_type.type_of_elements
        if not self.variable_type_exists(element_type):
            valid = False

    if not valid:
        error_msg = (
            f"Unknown data type '{variable_type}' for task " f"input variable '{identifier}'"
        )
        self.error_handler.print_error(error_msg, context=context)
        return False

    return True
check_instantiated_struct_attributes(struct_instance)

Calls multiple check methods to validate an instantiated Struct.

Multiple Checks are done

(1) Check if the name of the struct instance exists in the struct definitions. (2) Check if attributes from the struct definition are missing. (3) Check if there are attributes in the instance that do not exist in the definition. (4) Check if attributes in the instance do not match with attributes in the definition.

Parameters:

Name Type Description Default
struct_instance Struct

The instantiated struct that is checked.

required

Returns:

Type Description
bool

True if the instantiated Struct is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
def check_instantiated_struct_attributes(self, struct_instance: Struct) -> bool:
    """Calls multiple check methods to validate an instantiated Struct.

    Multiple Checks are done:
        (1) Check if the name of the struct instance exists in the struct definitions.
        (2) Check if attributes from the struct definition are missing.
        (3) Check if there are attributes in the instance that do not exist in the definition.
        (4) Check if attributes in the instance do not match with attributes in the definition.

    Args:
        struct_instance: The instantiated struct that is checked.

    Returns:
        True if the instantiated Struct is valid.
    """
    valid = True
    if self.check_if_struct_exists(struct_instance):
        struct_definition = self.structs[struct_instance.name]

        if not self.check_for_missing_attribute_in_struct(struct_instance, struct_definition):
            valid = False

        for identifier in struct_instance.attributes:
            if not (
                self.check_for_unknown_attribute_in_struct(
                    struct_instance, identifier, struct_definition
                )
                and self.check_for_wrong_attribute_type_in_struct(
                    struct_instance, identifier, struct_definition
                )
            ):
                valid = False
    else:
        valid = False

    return valid
check_parallel(parallel, task)

Calls check methods for the Parallel statement.

Returns:

Type Description
bool

True if the given Parallel statement is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
160
161
162
163
164
165
166
167
168
169
170
171
def check_parallel(self, parallel: Parallel, task: Task) -> bool:
    """Calls check methods for the Parallel statement.

    Returns:
        True if the given Parallel statement is valid.
    """

    valid = True
    for task_call in parallel.task_calls:
        if not self.check_task_call(task_call, task):
            valid = False
    return valid
check_service(service, task)

Calls check methods for the Service or Service Call statement.

Returns:

Type Description
bool

True if the given Service statement is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
208
209
210
211
212
213
214
215
216
def check_service(self, service: Service, task: Task) -> bool:
    """Calls check methods for the Service or Service Call statement.

    Returns:
        True if the given Service statement is valid.
    """
    if not self.check_call_parameters(service, task):
        return False
    return True
check_single_expression(expression, context, task)

Checks if a single expression is a valid expression.

Returns:

Type Description
bool

True if the given single expression is a valid expression.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
def check_single_expression(
    self, expression: Union[str, list], context: ParserRuleContext, task: Task
) -> bool:
    """Checks if a single expression is a valid expression.

    Returns:
        True if the given single expression is a valid expression.
    """
    if isinstance(expression, (str, int, float, bool)):
        return True
    if isinstance(expression, list):
        if not self.check_attribute_access(expression, context, task):
            return False

        variable_type = helpers.get_type_of_variable_list(expression, task, self.structs)
        if not (isinstance(variable_type, str) and variable_type in ["number", "boolean"]):
            msg = "The given attribute can not be resolved to a boolean expression"
            self.error_handler.print_error(
                msg, context=context, off_symbol_length=len(expression)
            )
            return False
    return True
check_statement(statement, task)

Calls check methods depending on the type of the statement.

Returns:

Type Description
bool

True if the given statement is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def check_statement(
    self,
    statement: Union[Service, TaskCall, WhileLoop, CountingLoop, Condition],
    task: Task,
) -> bool:
    """Calls check methods depending on the type of the statement.

    Returns:
        True if the given statement is valid.
    """
    if isinstance(statement, Service):
        return self.check_service(statement, task)
    if isinstance(statement, TaskCall):
        return self.check_task_call(statement, task)
    if isinstance(statement, Parallel):
        return self.check_parallel(statement, task)
    if isinstance(statement, WhileLoop):
        return self.check_while_loop(statement, task)
    if isinstance(statement, CountingLoop):
        return self.check_counting_loop(statement, task)
    return self.check_conditional_statement(statement, task)
check_statements(task)

Executes semantic checks for all statements in a Task.

Returns:

Type Description
bool

True if all Statements are valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
126
127
128
129
130
131
132
133
134
135
136
def check_statements(self, task: Task) -> bool:
    """Executes semantic checks for all statements in a Task.

    Returns:
        True if all Statements are valid.
    """
    valid = True
    for statement in task.statements:
        if not self.check_statement(statement, task):
            valid = False
    return valid
check_structs()

Executes semantic checks for each Struct.

Returns:

Type Description
bool

True if all Struct definitions are valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
71
72
73
74
75
76
77
78
79
80
81
def check_structs(self) -> bool:
    """Executes semantic checks for each Struct.

    Returns:
        True if all Struct definitions are valid.
    """
    valid = True
    for struct in self.structs.values():
        if not self.check_for_unknown_datatypes_in_struct_definition(struct):
            valid = False
    return valid
check_task_call(task_call, task_context)

Calls check methods for the TaskCall statement.

Parameters:

Name Type Description Default
task_call TaskCall

The task call to be checked.

required
task_context Task

The task in which the task call is defined.

required

Returns:

Type Description
bool

True if the given TaskCall statement is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def check_task_call(self, task_call: TaskCall, task_context: Task) -> bool:
    """Calls check methods for the TaskCall statement.

    Args:
        task_call: The task call to be checked.
        task_context: The task in which the task call is defined.

    Returns:
        True if the given TaskCall statement is valid.
    """
    if self.check_if_task_in_taskcall_exists(task_call.name, task_call.context):
        if not (
            self.check_call_parameters(task_call, task_context)
            and self.check_if_task_call_matches_with_called_task(task_call, task_context)
        ):
            return False
    else:
        return False
    return True
check_task_input_parameters(task)

Checks if the input parameters are valid.

Returns:

Type Description
bool

True if the input parameters of the given Task are valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def check_task_input_parameters(self, task: Task) -> bool:
    """Checks if the input parameters are valid.

    Returns:
        True if the input parameters of the given Task are valid.
    """
    valid = True
    # input_parameters: <identifier>: <variable_type>
    for identifier, variable_type in task.input_parameters.items():
        if not self.check_if_variable_definition_is_valid(
            identifier, variable_type, context=task.context_dict[IN_KEY]
        ):
            valid = False
    return valid
check_task_output_parameters(task)

Checks if the output parameters are valid.

Checks if the variable names used as parameters are variables defined in the Task.

Returns:

Type Description
bool

True if the output parameters of the given Task are valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def check_task_output_parameters(self, task: Task) -> bool:
    """Checks if the output parameters are valid.

    Checks if the variable names used as parameters are variables
    defined in the Task.

    Returns:
        True if the output parameters of the given Task are valid.
    """
    valid = True
    for output_param in task.output_parameters:
        if not output_param in task.variables:
            error_msg = (
                f"An unknown variable '{output_param}' is used "
                f"in the Task Output of Task '{task.name}'"
            )
            self.error_handler.print_error(error_msg, context=task.context_dict[OUT_KEY])
            valid = False
    return valid
check_tasks()

Executes semantic checks for each Task.

Returns:

Type Description
bool

True if all Task definitions are valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def check_tasks(self) -> bool:
    """Executes semantic checks for each Task.

    Returns:
        True if all Task definitions are valid.
    """
    valid = True

    start_task_found = False
    for task in self.tasks.values():
        if task.name == START_TASK:
            start_task_found = True

        # use & so all methods will be executed even if a method returns False
        if not (
            self.check_statements(task)
            & self.check_task_input_parameters(task)
            & self.check_task_output_parameters(task)
        ):
            valid = False

    if not start_task_found:
        error_msg = "The file contains no 'productionTask' (Starting Point)"
        self.error_handler.print_error(error_msg, line=1, column=0, off_symbol_length=5)
        return False

    return valid
check_unary_operation(expression, context, task)

Checks if a unary expression is a valid expression.

Returns:

Type Description
bool

True if the given unary expression is a valid expression.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
785
786
787
788
789
790
791
def check_unary_operation(self, expression, context: ParserRuleContext, task: Task) -> bool:
    """Checks if a unary expression is a valid expression.

    Returns:
        True if the given unary expression is a valid expression.
    """
    return self.check_expression(expression["value"], context, task)
check_while_loop(while_loop, task)

Calls check methods for the While Loop statement.

Returns:

Type Description
bool

True if the While Loop statement is valid.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
692
693
694
695
696
697
698
699
700
701
702
703
704
705
def check_while_loop(self, while_loop: WhileLoop, task: Task) -> bool:
    """Calls check methods for the While Loop statement.

    Returns:
        True if the While Loop statement is valid.
    """
    valid = True
    for statement in while_loop.statements:
        if not self.check_statement(statement, task):
            valid = False

    if not self.check_expression(while_loop.expression, while_loop.context, task):
        valid = False
    return valid
expression_is_number(expression, task)

Checks if the given expression is a number (int or float).

Returns:

Type Description
bool

True if the given expression is a number.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
def expression_is_number(self, expression, task: Task) -> bool:
    """Checks if the given expression is a number (int or float).

    Returns:
        True if the given expression is a number.
    """
    if isinstance(expression, (int, float, bool)):
        return True
    if isinstance(expression, list):
        given_type = helpers.get_type_of_variable_list(expression, task, self.structs)
        return isinstance(given_type, str) and given_type == "number"
    if isinstance(expression, dict):
        if expression["left"] == "(" and expression["right"] == ")":
            return self.expression_is_number(expression["binOp"], task)
        else:
            return self.expression_is_number(
                expression["left"], task
            ) and self.expression_is_number(expression["right"], task)

    return False
expression_is_string(expression, task)

Checks if the given expression is a PFDL string.

Returns:

Type Description
bool

True if the given expression is a PFDL string.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
847
848
849
850
851
852
853
854
855
856
857
858
def expression_is_string(self, expression, task: Task) -> bool:
    """Checks if the given expression is a PFDL string.

    Returns:
        True if the given expression is a PFDL string.
    """
    if isinstance(expression, str):
        return True
    if isinstance(expression, list):
        given_type = helpers.get_type_of_variable_list(expression, task, self.structs)
        return isinstance(given_type, str) and given_type == "string"
    return False
instantiated_array_length_correct(instantiated_array, array_definition)

Checks if the length of the instantiated array matches with the array definition.

Returns:

Type Description
bool

True if both lengths are equal.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
914
915
916
917
918
919
920
921
922
923
924
925
def instantiated_array_length_correct(
    self, instantiated_array: Array, array_definition: Array
) -> bool:
    """Checks if the length of the instantiated array matches with the array definition.

    Returns:
        True if both lengths are equal.
    """
    if array_definition.length != -1:
        return instantiated_array.length == array_definition.length
    # dynamic length
    return True
validate_process()

Starts static semantic checks.

Returns:

Type Description
bool

True, if the process has no errors, otherwise False.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
61
62
63
64
65
66
67
68
def validate_process(self) -> bool:
    """Starts static semantic checks.

    Returns:
        True, if the process has no errors, otherwise False.
    """
    # use & so all methods will be executed even if a method returns False
    return self.check_structs() & self.check_tasks()
variable_type_exists(variable_type)

Checks if the given variable type exists in the PFDL file.

A variable type can be a primitive (number, string or boolean) or an identifier of a defined Struct.

Returns:

Type Description
bool

True if the variable type exists within the PFDL file.

Source code in pfdl_scheduler/validation/semantic_error_checker.py
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
def variable_type_exists(self, variable_type: str) -> bool:
    """Checks if the given variable type exists in the PFDL file.

    A variable type can be a primitive (number, string or boolean) or
    an identifier of a defined Struct.

    Returns:
        True if the variable type exists within the PFDL file.
    """
    if variable_type[0].isupper():
        if variable_type not in self.structs:
            return False
    elif variable_type not in ["number", "string", "boolean"]:
        return False
    return True

syntax_error_listener

Contains SyntaxErrorListener class.

SyntaxErrorListener(token_stream, error_handler)

Bases: ErrorListener

Custom ErrorListener for the PFDL.

Overrides Antlr ErrorListener class so we can use our ErrorHandler class for syntax errors.

Attributes:

Name Type Description
token_stream CommonTokenStream

ANTLR token stream.

error_handler ErrorHandler

ErrorHandler instance for printing errors.

Initialize the object.

Parameters:

Name Type Description Default
token_stream CommonTokenStream

ANTLR token stream.

required
error_handler ErrorHandler

ErrorHandler instance for printing errors.

required
Source code in pfdl_scheduler/validation/syntax_error_listener.py
31
32
33
34
35
36
37
38
39
40
def __init__(self, token_stream: CommonTokenStream, error_handler: ErrorHandler) -> None:
    """Initialize the object.

    Args:
        token_stream: ANTLR token stream.
        error_handler: ErrorHandler instance for printing errors.
    """
    super()
    self.token_stream: CommonTokenStream = token_stream
    self.error_handler: ErrorHandler = error_handler
syntaxError(recognizer, offendingSymbol, line, column, msg, e)

Overwrites the ANTLR ErrorListener method to use the error handler.

Source code in pfdl_scheduler/validation/syntax_error_listener.py
42
43
44
45
46
47
48
49
50
51
52
def syntaxError(
    self,
    recognizer: Recognizer,
    offendingSymbol: Any,
    line: int,
    column: int,
    msg: str,
    e: RecognitionException,
) -> None:
    """Overwrites the ANTLR ErrorListener method to use the error handler."""
    self.error_handler.print_error(msg, line=line, column=column, syntax_error=True)