File:
app/core/workers/celery_worker.py
Purpose: Real-time water flow monitoring and irrigation calculation system
Criticality: ⚠️ MOST CRITICAL worker in the irrigation system
This worker is the heart of the irrigation monitoring system. It:
Think of it as: A smart water meter that tells you exactly how much water each irrigation zone used, when it used it, and sends that info to farmers' dashboards immediately.
Water flows → Konteau sensor → Database → PostgreSQL NOTIFY →
Celery Worker → Calculations → Save to DB → Send to External APIs
What it does: Sits and waits for new water flow data 24/7
How it works:
LISTEN new_row_addedWhy it's important: This is what makes the system "real-time" - no polling, instant response.
Configuration:
What it does: The heart of the system - calculates water usage
Parameters: task_id - The celery task ID for this execution
Step-by-step process:
instant_water_flow = current - previousExample calculation:
Previous reading: 100.5 L
Current reading: 102.3 L
Instant flow: 1.8 L ← This is what gets saved
What it does: Sends calculated data to external IoT platforms
Features:
konteau (Raw Water Data)water_flow: Accumulated water total (m³ or L)water_pressure: Water pressure (kPa)water_temperature: Water temperature (°C)instant_flow_rate: Current flow rate (m³/h)calculated_data (Processed Results)instant_water_flow: How much water flowed since last readingirrigation_round_total: Total water in current irrigation sessiondaily_irrigation: Total water used today (midnight to midnight)device_list (Device Info)iothub_url: Where to send the dataiothub_serial: Device identifierdevice_subtype: KONTEAU_20, KONTEAU_50, KONTEAU_100, KONTEAU_150, KONTEAU_P, KONTEAU_RKV, etc.# KONTEAU_20 devices: Everything in LITERS (L)
if device_subtype == 'KONTEAU_20':
unit = 'Liters'
# ALL OTHER KONTEAU devices: Everything in CUBIC METERS (m³)
elif device_subtype.startswith('KONTEAU_') and device_subtype != 'KONTEAU_20':
unit = 'Cubic meters'
# When water flows (instant_water_flow > 0):
irrigation_round_total += instant_water_flow # Keep adding
# When water stops (instant_water_flow = 0):
irrigation_round_total = 0 # Reset to zero - irrigation finished
-- This is the most important line in the entire system:
instant_water_flow = last_row.water_flow - previous_row.water_flow
For KONTEAU_RKV and KONTEAU_20 devices only:
Formula explanation:
Vn+1 - Vn: Volume difference between current and previous readingTn+1 - Tn: Time difference in seconds between readings× 3600: Convert per-second rate to per-hour ratekonteau.instant_flow_rate columnDevice behavior:
| Time | Water Reading | Instant Flow | Round Total | Status |
|---|---|---|---|---|
| 10:00 | 100.0 L | 0.0 L | 0.0 L | No irrigation |
| 10:01 | 102.5 L | 2.5 L | 2.5 L | Irrigation starts |
| 10:02 | 104.8 L | 2.3 L | 4.8 L | Irrigation continues |
| 10:03 | 107.1 L | 2.3 L | 7.1 L | Irrigation continues |
| 10:04 | 109.2 L | 2.1 L | 9.2 L | Irrigation continues |
| 10:05 | 109.2 L | 0.0 L | 0.0 L | Irrigation ends (RESET) |
{
"serial": "device_serial_from_db",
"timestamp": "2024-01-01 10:05:00",
"instant_water_flow": 0.0,
"accumulated_water_flow": 109.2,
"accumulated_water_flow_rev": 0.0,
"instant_flow_rate": 2.5,
"water_temperature": 22.5,
"irrigation_round": 0.0,
"water_pressure": 2.1
}
Check:
listen_for_new_row running?Check:
Check:
iothub_url is filled in device_listiothub_serial is correctCheck:
instant_water_flow actually becomes 0cumulative_water_flows global variable🚀 Starting compute_waterflow_diff task... ← Task started
🔍 Executing query to get iothub_url... ← Getting device config
📊 Fetched main query results: [...] ← Got the data
📝 Updated cumulative water flow for ID X: Y ← Updated irrigation round
📤 Prepared data to send: {...} ← Ready to send
📨 Request sent successfully: <Response [200]> ← API call worked
❌ An error occurred while sending... ← Something failed
-- Check latest readings
SELECT * FROM konteau WHERE device_list_id = X ORDER BY id DESC LIMIT 5;
-- Check calculated results
SELECT * FROM calculated_data WHERE device_list_id = X ORDER BY id DESC LIMIT 5;
-- Check device configuration
SELECT iothub_url, iothub_serial, device_subtype FROM device_list WHERE id = X;
POSTGRES_USER # Database username
POSTGRES_DB # Database name
POSTGRES_PASSWORD # Database password
POSTGRES_HOST # Database host
POSTGRES_PORT # Database port
CELERY_BROKER_URL # Message broker URL
CELERY_RESULT_BACKEND # Result backend URL
Remember: This worker is the foundation of the entire irrigation monitoring system. If this breaks, farmers can't see their water usage, which is critical for agriculture.
Variable: cumulative_water_flows (List of dictionaries)
Structure:
[{
'device_list_id': int,
'cumulative_water_flow': float,
'id': int # calculated_data record ID
}]
Logic:
instant_water_flow == 0instant_water_flow > 0The worker uses this complex query to get data and calculate differences:
WITH last_row AS (
SELECT id, device_list_id, water_flow, timestamp_data, water_flow_rev, instant_flow_rate, water_temperature, water_pressure
FROM public.konteau
ORDER BY id DESC
LIMIT 1
), previous_row AS (
SELECT id, device_list_id, water_flow, timestamp_data
FROM public.konteau
WHERE device_list_id = (SELECT device_list_id FROM last_row)
AND id < (SELECT id FROM last_row)
ORDER BY id DESC
LIMIT 1
), iothub_data AS (
SELECT device_list.iothub_url, device_list.id, device_list.iothub_serial, device_list.device_subtype
FROM device_list
WHERE device_list.id = (SELECT device_list_id FROM last_row)
ORDER BY device_list.id DESC
LIMIT 1
), update_instant_flow AS (
UPDATE public.konteau
SET instant_flow_rate =
CASE
WHEN EXTRACT(EPOCH FROM (last_row.timestamp_data - previous_row.timestamp_data)) > 0 THEN
ROUND(
CAST(
((last_row.water_flow - previous_row.water_flow) /
EXTRACT(EPOCH FROM (last_row.timestamp_data - previous_row.timestamp_data))) * 3600
AS numeric), 2)
ELSE 0
END
FROM last_row, previous_row, iothub_data
WHERE konteau.id = last_row.id
AND iothub_data.device_subtype IN ('KONTEAU_RKV', 'KONTEAU_20')
RETURNING 1
), insert_data AS (
INSERT INTO public.calculated_data(device_list_id, timestamp_data, instant_water_flow)
SELECT last_row.device_list_id, last_row.timestamp_data, last_row.water_flow - previous_row.water_flow
FROM last_row, previous_row
RETURNING device_list_id, timestamp_data, instant_water_flow
)
SELECT
insert_data.device_list_id,
insert_data.timestamp_data,
insert_data.instant_water_flow,
iothub_data.iothub_url,
iothub_data.iothub_serial,
last_row.water_flow,
last_row.water_flow_rev,
last_row.instant_flow_rate,
last_row.water_temperature,
last_row.water_pressure
FROM insert_data, iothub_data, last_row
This documentation represents the functionality of the celery_worker.py .