Coverage for src \ ingress_recipient \ types.py: 97%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-15 14:42 +0000

1"""Type definitions for ingress recipient SDK. 

2 

3These types define the contract for data received FROM ingress cores 

4after processing through the 5-stage pipeline. 

5 

6IMPORTANT: All field names use snake_case to match backend conventions. 

7""" 

8 

9from datetime import datetime 

10from typing import Any 

11from uuid import UUID 

12 

13from pydantic import BaseModel, Field, field_validator 

14 

15 

16class RecipientMetadata(BaseModel): 

17 """Metadata about the ingress processing pipeline. 

18 

19 Contains information about how the data was processed before 

20 being sent to the recipient service. 

21 

22 Attributes: 

23 source_ip: Optional source IP address of the original sender 

24 ingress_timestamp: When the data was received by the ingress core 

25 transformations_applied: Optional list of transformation names applied 

26 

27 Example: 

28 ```python 

29 metadata = RecipientMetadata( 

30 source_ip="192.168.1.100", 

31 ingress_timestamp=datetime.now(UTC), 

32 transformations_applied=["normalize", "enrich", "speed_conversion"], 

33 ) 

34 ``` 

35 """ 

36 

37 source_ip: str | None = Field(None, description="Source IP of original sender") 

38 ingress_timestamp: datetime = Field(..., description="Ingress receipt timestamp") 

39 transformations_applied: list[str] | None = Field( 

40 None, description="Transformations applied by ingress pipeline" 

41 ) 

42 

43 

44class RecipientData(BaseModel): 

45 """Data received from ingress core after processing. 

46 

47 This is the standardized contract that ALL recipients must accept. 

48 

49 The ingress core processes data through a 5-stage pipeline: 

50 1. Auth - Validate API key/JWT 

51 2. Validate - JSON Schema validation 

52 3. Transform - Field mapping, enrichment 

53 4. Route - Destination selection 

54 5. Dispatch - Send to recipients 

55 

56 IMPORTANT: All field names use snake_case to match backend conventions. 

57 

58 Attributes: 

59 channel_id: Channel identifier (e.g., 'ch_maritime_telemetry') 

60 tenant_id: Tenant UUID 

61 timestamp: Original data timestamp (from the client, not when ingested) 

62 data: Arbitrary JSON data payload (potentially transformed) 

63 metadata: Ingress processing metadata 

64 

65 Example: 

66 ```python 

67 data = RecipientData( 

68 channel_id="ch_maritime_telemetry", 

69 tenant_id=UUID("123e4567-e89b-12d3-a456-426614174000"), 

70 timestamp=datetime.now(UTC), 

71 data={ 

72 "vessel_id": "v_12345", 

73 "latitude": 59.9139, 

74 "longitude": 10.7522, 

75 "speed": 15.3, 

76 }, 

77 metadata=RecipientMetadata( 

78 ingress_timestamp=datetime.now(UTC), 

79 transformations_applied=["normalize"], 

80 ), 

81 ) 

82 ``` 

83 """ 

84 

85 channel_id: str = Field(..., min_length=1, description="Channel identifier") 

86 tenant_id: UUID = Field(..., description="Tenant UUID") 

87 timestamp: datetime = Field(..., description="Original data timestamp") 

88 data: dict[str, Any] = Field(..., description="Arbitrary JSON data") 

89 metadata: RecipientMetadata = Field(..., description="Ingress metadata") 

90 

91 @field_validator("channel_id") 

92 @classmethod 

93 def validate_channel_id(cls, v: str) -> str: 

94 """Validate channel_id is not empty or whitespace-only. 

95 

96 Args: 

97 v: The channel_id value to validate 

98 

99 Returns: 

100 The validated channel_id 

101 

102 Raises: 

103 ValueError: If channel_id is empty or whitespace-only 

104 """ 

105 if not v.strip(): 

106 raise ValueError("channel_id cannot be empty or whitespace-only") 

107 return v 

108 

109 

110class RecipientResponse(BaseModel): 

111 """Response sent back to ingress core after processing. 

112 

113 Recipients should return this response to indicate whether 

114 they successfully processed the data or not. 

115 

116 Attributes: 

117 status: Processing status ('accepted' or 'rejected') 

118 message: Optional message with details about the processing result 

119 

120 Example: 

121 ```python 

122 # Success 

123 response = RecipientResponse( 

124 status="accepted", 

125 message="Data processed successfully", 

126 ) 

127 

128 # Failure 

129 response = RecipientResponse( 

130 status="rejected", 

131 message="Invalid vessel_id: v_12345 not found", 

132 ) 

133 ``` 

134 """ 

135 

136 status: str = Field(..., pattern="^(accepted|rejected)$", description="Processing status") 

137 message: str | None = Field(None, description="Optional status message") 

138 

139 @field_validator("status") 

140 @classmethod 

141 def validate_status(cls, v: str) -> str: 

142 """Validate status is either 'accepted' or 'rejected'. 

143 

144 Args: 

145 v: The status value to validate 

146 

147 Returns: 

148 The validated status 

149 

150 Raises: 

151 ValueError: If status is not 'accepted' or 'rejected' 

152 """ 

153 if v not in ("accepted", "rejected"): 

154 raise ValueError("status must be 'accepted' or 'rejected'") 

155 return v