如何用 LLM 处理非结构化数据?Google 洪水预测的 6 个核心技术实战解析
引言
2026 年 3 月,Google 研究团队发布了一项创新性的洪水预测系统,该系统使用 Gemini 大语言模型分析了 500 万篇新闻报道,提取出 260 万次洪水事件数据,成功构建了全球洪水预测模型。这项技术展示了 LLM 在非结构化数据处理领域的巨大潜力。
对于开发者而言,这个案例提供了宝贵的实战经验:如何将 LLM 与传统机器学习模型结合,从文本数据中构建可量化的预测系统。本文将深入解析 Google 的技术方案,并提供 6 个可落地的实战技巧,帮助开发者在自己的项目中应用类似方法。
技术背景:为什么需要 LLM 处理非结构化数据?
数据稀缺问题
在许多领域,结构化数据极其稀缺:
- 气象领域:洪水事件太短暂和局部化,无法像温度或河流流量那样被全面监测
- 医疗领域:罕见病例记录分散在医生笔记和病历中
- 金融领域:市场情绪隐藏在新闻、社交媒体和分析师报告中
- 制造业:设备故障前兆记录在维修日志和操作员笔记中
Google 的解决方案提供了一个通用范式:使用 LLM 将定性描述转化为定量数据。
Groundsource 数据集架构
Google 创建的”Groundsource”数据集包含以下核心字段:
{
"event_id": "flood_2024_001234",
"location": {
"latitude": 23.8103,
"longitude": 90.4125,
"region": "Dhaka, Bangladesh"
},
"timestamp": "2024-07-15T08:30:00Z",
"severity": "high",
"affected_area_km2": 45.6,
"casualties": 12,
"source_article": {
"url": "https://example.com/news/123",
"publication_date": "2024-07-15",
"language": "en"
},
"extraction_confidence": 0.94
}
核心技术 1:LLM 信息提取提示词工程
基础提取框架
Google 使用的提示词结构包含以下关键元素:
EXTRACTION_PROMPT = """
你是一名专业的气象数据分析师。请从以下新闻报道中提取洪水事件的结构化信息。
**输入文本**:
{article_text}
**提取要求**:
1. 地理位置:提取具体的城市、地区、国家名称,如无法获取精确坐标则标注为"unknown"
2. 时间信息:提取事件发生的具体日期和时间,格式为 ISO 8601
3. 严重程度:根据描述判断为 low/medium/high/critical
4. 影响范围:提取受影响区域描述(如"整个村庄"、"三个街区"等)
5. 人员伤亡:提取具体数字,如未提及则标注为 null
6. 置信度评分:对提取信息的准确性评分(0.0-1.0)
**输出格式**:
请严格按照以下 JSON 格式输出,不要包含任何额外文本:
{
"location": {"name": "", "country": "", "coordinates": null},
"timestamp": "",
"severity": "",
"affected_area": "",
"casualties": null,
"confidence": 0.0
}
**注意事项**:
- 如果文本中没有明确的洪水事件,返回 {"event_detected": false}
- 对于模糊的时间描述(如"上周"),根据文章发布日期推算具体日期
- 对于多个事件,分别提取并返回数组
"""
实战技巧:多轮验证提取
单次提取可能存在误差,Google 采用了多轮验证策略:
def multi_round_extraction(article_text, llm_client):
# 第一轮:初步提取
initial_result = llm_client.extract(
prompt=EXTRACTION_PROMPT.format(article_text=article_text)
)
# 第二轮:验证提取
validation_prompt = f"""
请验证以下提取结果的准确性:
原始文本:{article_text[:500]}...
提取结果:{initial_result}
请指出:
1. 是否有遗漏的关键信息
2. 是否有错误的地理或时间信息
3. 置信度评分是否合理
输出修正后的 JSON 结果。
"""
validated_result = llm_client.extract(prompt=validation_prompt)
return validated_result
核心技术 2:地理实体解析与坐标转换
地名标准化挑战
新闻报道中的地理位置描述通常模糊不清:
- “孟加拉国首都附近”
- “达卡市区北部”
- “恒河三角洲地区”
Google 使用多层解析策略:
from geopy.geocoders import Nominatim
import googlemaps
class LocationResolver:
def __init__(self):
self.nominatim = Nominatim(user_agent="flood_prediction")
self.gmaps = googlemaps.Client(key=API_KEY)
def resolve_location(self, location_description, country_hint=None):
"""
多层地理位置解析
"""
# 第一层:尝试精确匹配
result = self._try_exact_match(location_description)
if result:
return result
# 第二层:使用国家提示进行模糊匹配
if country_hint:
result = self._try_country_context(location_description, country_hint)
if result:
return result
# 第三层:使用 LLM 推断可能的位置
result = self._llm_infer_location(location_description)
return result
def _try_exact_match(self, description):
try:
location = self.nominatim.geocode(description)
if location:
return {
"name": description,
"latitude": location.latitude,
"longitude": location.longitude,
"confidence": 0.9
}
except:
pass
return None
def _llm_infer_location(self, description):
"""使用 LLM 推断模糊位置"""
prompt = f"""
请将以下地理位置描述转换为最可能的具体位置:
描述:{description}
请提供:
1. 最可能的城市/地区名称
2. 推测的国家
3. 推测的置信度(0.0-1.0)
输出 JSON 格式。
"""
return self.llm_client.extract(prompt)
核心技术 3:时间表达式标准化
相对时间解析
新闻中常见的时间表达:
- “昨天下午”
- “上周三”
- “本月早些时候”
- “雨季期间”
from dateutil import parser
from datetime import datetime, timedelta
import re
class TimeNormalizer:
def __init__(self, reference_date=None):
self.reference_date = reference_date or datetime.now()
def normalize(self, time_expression, article_date=None):
"""
将自然语言时间表达式转换为 ISO 8601 格式
"""
ref_date = article_date or self.reference_date
# 处理相对时间
relative_patterns = {
r'昨天': timedelta(days=1),
r'前天': timedelta(days=2),
r'上周': timedelta(weeks=1),
r'本月': timedelta(days=0), # 需要更多上下文
r'前几天': timedelta(days=3),
}
for pattern, delta in relative_patterns.items():
if re.search(pattern, time_expression):
return (ref_date - delta).isoformat()
# 尝试标准解析
try:
parsed = parser.parse(time_expression, fuzzy=True)
return parsed.isoformat()
except:
pass
# 使用 LLM 解析复杂表达
return self._llm_parse_time(time_expression, ref_date)
def _llm_parse_time(self, expression, ref_date):
prompt = f"""
参考日期:{ref_date.isoformat()}
请将以下时间表达转换为具体日期:
"{expression}"
输出 ISO 8601 格式日期。如果无法确定,返回 null。
"""
return self.llm_client.extract(prompt)
核心技术 4:数据质量验证与去重
跨源验证
同一事件可能被多家媒体报道,需要去重和验证:
import hashlib
from sklearn.cluster import DBSCAN
import numpy as np
class EventDeduplicator:
def __init__(self):
self.events = []
def add_event(self, event):
"""添加事件并进行去重检查"""
similar_events = self._find_similar_events(event)
if similar_events:
# 合并相似事件的信息
merged_event = self._merge_events(similar_events + [event])
for e in similar_events:
self.events.remove(e)
self.events.append(merged_event)
else:
self.events.append(event)
def _find_similar_events(self, new_event, time_window_days=3, distance_km=50):
"""查找相似事件"""
similar = []
for event in self.events:
# 时间接近性检查
time_diff = abs(
datetime.fromisoformat(new_event['timestamp']) -
datetime.fromisoformat(event['timestamp'])
).days
if time_diff > time_window_days:
continue
# 地理位置接近性检查
distance = self._haversine_distance(
new_event['location'],
event['location']
)
if distance < distance_km:
similar.append(event)
return similar
def _haversine_distance(self, loc1, loc2):
"""计算两点之间的球面距离(公里)"""
from math import radians, sin, cos, sqrt, atan2
R = 6371 # 地球半径(公里)
lat1, lon1 = radians(loc1['latitude']), radians(loc1['longitude'])
lat2, lon2 = radians(loc2['latitude']), radians(loc2['longitude'])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
def _merge_events(self, events):
"""合并多个相似事件的信息"""
merged = events[0].copy()
merged['sources'] = [e.get('source', 'unknown') for e in events]
merged['confidence'] = min(0.99, sum(e['confidence'] for e in events) / len(events))
return merged
核心技术 5:LLM 与 LSTM 模型融合
架构设计
Google 的模型架构结合了两者的优势:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 新闻报道文本 │────▶│ LLM 特征提取 │────▶│ 结构化事件数据 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 天气预报数据 │────▶│ LSTM 预测模型 │◀────│ Groundsource │
└─────────────────┘ └─────────────────┘ │ 数据集 │
└─────────────────┘
│
▼
┌─────────────────┐
│ 洪水风险预测 │
└─────────────────┘
实现代码
import tensorflow as tf
from tensorflow import keras
import numpy as np
class FloodPredictionModel:
def __init__(self, input_shape, llm_features_dim=128):
self.input_shape = input_shape
self.llm_features_dim = llm_features_dim
self.model = self._build_model()
def _build_model(self):
"""构建 LSTM 预测模型"""
model = keras.Sequential([
# LSTM 层处理时间序列数据
keras.layers.LSTM(
128,
return_sequences=True,
input_shape=self.input_shape
),
keras.layers.Dropout(0.3),
keras.layers.LSTM(64),
keras.layers.Dropout(0.3),
# 融合 LLM 提取的特征
keras.layers.Dense(64, activation='relu'),
# 输出层:洪水概率
keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='binary_crossentropy',
metrics=['accuracy', keras.metrics.AUC(name='auc')]
)
return model
def prepare_features(self, weather_data, llm_extracted_features):
"""
准备模型输入特征
weather_data: 时间序列气象数据(降雨量、温度、湿度等)
llm_extracted_features: LLM 从新闻中提取的特征向量
"""
# 标准化气象数据
weather_normalized = self._normalize_weather(weather_data)
# 融合 LLM 特征
combined_features = np.concatenate([
weather_normalized,
llm_extracted_features
], axis=-1)
return combined_features
def train(self, X_train, y_train, X_val, y_val, epochs=100):
"""训练模型"""
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)
history = self.model.fit(
X_train, y_train,
validation_data=(X_val, y_val),
epochs=epochs,
batch_size=32,
callbacks=[early_stopping]
)
return history
def predict(self, weather_data, llm_features):
"""预测洪水概率"""
features = self.prepare_features(weather_data, llm_features)
probability = self.model.predict(features)
return probability[0][0]
核心技术 6:实时数据管道与 API 设计
流式数据处理架构
from kafka import KafkaConsumer, KafkaProducer
import json
from datetime import datetime
class RealTimeFloodPipeline:
def __init__(self, config):
self.consumer = KafkaConsumer(
'news-feed',
bootstrap_servers=config['kafka_servers'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=config['kafka_servers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.llm_client = LLMClient(config['llm_api_key'])
self.prediction_model = FloodPredictionModel.load(config['model_path'])
def run(self):
"""运行实时处理管道"""
for message in self.consumer:
try:
article = message.value
# 步骤 1: LLM 信息提取
extracted_data = self.llm_client.extract_flood_event(
article['text']
)
if not extracted_data.get('event_detected'):
continue
# 步骤 2: 地理位置解析
location = self.location_resolver.resolve(
extracted_data['location']
)
# 步骤 3: 获取实时气象数据
weather_data = self.weather_api.get_forecast(
location['latitude'],
location['longitude'],
days=7
)
# 步骤 4: 洪水风险预测
risk_score = self.prediction_model.predict(
weather_data,
extracted_data['features']
)
# 步骤 5: 发布预测结果
prediction = {
'event_id': self._generate_id(),
'location': location,
'timestamp': datetime.now().isoformat(),
'flood_risk': float(risk_score),
'risk_level': self._classify_risk(risk_score),
'confidence': extracted_data['confidence']
}
self.producer.send('flood-predictions', value=prediction)
# 步骤 6: 高风险事件告警
if risk_score > 0.7:
self._send_alert(prediction)
except Exception as e:
print(f"处理错误:{e}")
continue
def _classify_risk(self, score):
if score > 0.8:
return 'critical'
elif score > 0.6:
return 'high'
elif score > 0.4:
return 'medium'
else:
return 'low'
def _send_alert(self, prediction):
"""发送高风险告警"""
alert = {
'type': 'flood_risk_alert',
'priority': 'high',
'location': prediction['location'],
'risk_score': prediction['flood_risk'],
'timestamp': prediction['timestamp']
}
self.producer.send('alerts', value=alert)
实战案例:构建自己的事件提取系统
完整示例代码
以下是一个简化版的新闻事件提取系统:
# install: pip install openai geopy python-dateutil kafka-python
import os
import json
from openai import OpenAI
from geopy.geocoders import Nominatim
from datetime import datetime
class NewsEventExtractor:
def __init__(self, openai_api_key):
self.client = OpenAI(api_key=openai_api_key)
self.geolocator = Nominatim(user_agent="event_extractor")
def extract_event(self, article_text, event_type="flood"):
"""从新闻文章中提取事件信息"""
prompt = f"""
你是一名专业的事件数据分析师。请从以下新闻报道中提取{event_type}事件的结构化信息。
**输入文本**:
{article_text[:3000]} # 限制长度以节省 token
**提取要求**:
1. 事件类型确认:确认是否为{event_type}事件
2. 地理位置:城市、地区、国家
3. 时间:事件发生的具体日期
4. 严重程度:low/medium/high/critical
5. 关键数字:伤亡人数、受影响人数、经济损失等
6. 信息来源:媒体名称、发布日期
**输出格式**(严格 JSON):
{{
"event_detected": true/false,
"event_type": "{event_type}",
"location": {{
"description": "",
"city": "",
"country": "",
"coordinates": {{ "lat": null, "lon": null }}
}},
"timestamp": "ISO 8601 格式",
"severity": "low/medium/high/critical",
"metrics": {{
"casualties": null,
"affected_people": null,
"economic_loss": null
}},
"source": {{
"media": "",
"publish_date": ""
}},
"confidence": 0.0-1.0
}}
"""
response = self.client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[
{"role": "system", "content": "你是专业的事件数据分析师,只输出 JSON 格式。"},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=0.1
)
result = json.loads(response.choices[0].message.content)
# 地理编码
if result['event_detected'] and result['location']['description']:
try:
location = self.geolocator.geocode(result['location']['description'])
if location:
result['location']['coordinates'] = {
'lat': location.latitude,
'lon': location.longitude
}
except:
pass
return result
def batch_extract(self, articles, event_type="flood"):
"""批量处理文章"""
results = []
for i, article in enumerate(articles):
print(f"处理文章 {i+1}/{len(articles)}")
result = self.extract_event(article, event_type)
results.append(result)
return results
# 使用示例
if __name__ == "__main__":
extractor = NewsEventExtractor(openai_api_key=os.getenv('OPENAI_API_KEY'))
sample_article = """
孟加拉国达卡地区昨日发生严重洪水,超过 10 万人受到影响。
连续三天的暴雨导致恒河水位上涨,多个村庄被淹没。
当地政府已疏散约 5 万名居民,目前确认有 3 人遇难。
救援工作正在进行中,预计未来一周降雨将持续。
"""
result = extractor.extract_event(sample_article, event_type="flood")
print(json.dumps(result, indent=2, ensure_ascii=False))
最佳实践与注意事项
1. 成本控制策略
LLM 调用成本可能很高,建议采用以下优化:
# 分级处理策略
def cost_optimized_extraction(articles):
# 第一级:使用规则过滤明显不相关的文章
relevant = [a for a in articles if has_keywords(a, ['洪水', '暴雨', '淹没'])]
# 第二级:使用小模型进行初步分类
likely_events = [a for a in relevant if small_model_classify(a) > 0.5]
# 第三级:只对高概率文章使用大模型详细提取
results = [llm_extract(a) for a in likely_events]
return results
2. 数据隐私与合规
- 遵守新闻内容的版权和使用条款
- 对个人信息进行脱敏处理
- 遵循 GDPR 等数据保护法规
3. 模型监控与迭代
class ModelMonitor:
def __init__(self):
self.extraction_log = []
def log_extraction(self, input_text, output, human_feedback=None):
"""记录提取结果用于后续分析"""
self.extraction_log.append({
'timestamp': datetime.now().isoformat(),
'input_length': len(input_text),
'output': output,
'human_feedback': human_feedback
})
def analyze_accuracy(self):
"""分析提取准确率"""
with_feedback = [r for r in self.extraction_log if r['human_feedback']]
if not with_feedback:
return None
accuracy = sum(
1 for r in with_feedback
if r['human_feedback'] == 'correct'
) / len(with_feedback)
return accuracy
def get_improvement_suggestions(self):
"""获取模型改进建议"""
errors = [
r for r in self.extraction_log
if r.get('human_feedback') == 'incorrect'
]
# 分析常见错误模式
return self._analyze_error_patterns(errors)
扩展应用场景
Google 的技术方案可以应用于多个领域:
1. 金融舆情分析
- 从新闻中提取公司负面事件
- 预测股票价格波动
- 监控行业风险
2. 公共卫生监测
- 从社交媒体提取疾病爆发信息
- 预测疫情传播趋势
- 辅助医疗资源调配
3. 供应链风险管理
- 从新闻中提取可能影响供应链的事件
- 预测物流中断风险
- 优化库存策略
4. 网络安全威胁情报
- 从技术论坛提取漏洞信息
- 预测网络攻击趋势
- 自动化威胁评估
总结
Google 的洪水预测系统展示了 LLM 与传统 ML 模型结合的强大能力。对于开发者而言,关键学习点包括:
- LLM 作为数据转换层:将非结构化文本转换为结构化数据
- 多模型融合架构:LLM 负责理解,传统 ML 负责预测
- 数据质量保障:多轮验证、去重、地理编码
- 实时处理能力:流式数据管道设计
- 成本优化:分级处理、缓存策略
- 可扩展性:模块化设计,易于应用到其他领域
这套方法论不仅适用于洪水预测,也可以迁移到任何需要从文本数据构建预测系统的场景。随着 LLM 能力的不断提升和成本的持续下降,这种技术范式将在更多领域得到应用。
参考资源
本文基于 Google 研究团队公开的技术方案和论文编写,代码示例仅供参考,实际部署时请根据具体需求进行调整和优化。