

文|祝融
编辑|郭嘉

-
结构化数据载荷
class ProcessorPart:content: ProcessorContent # 实际数据载荷metadata: Dict[str, Any] # 元数据字典mime_type: str # MIME类型标识timestamp: float # 时间戳sequence_id: str # 序列标识符
-
异步流处理能力库提供了用于分割、连接和合并ProcessorParts异步流的实用工具。这意味着数据可以在不阻塞主线程的情况下连续处理:
async def process_stream(input_stream: AsyncIterator[ProcessorPart]) -> AsyncIterator[ProcessorPart]:async for part in input_stream:# 处理每个部分processed_part = await transform_part(part)yield processed_part
-
双向流控制与传统的单向数据流不同,GenAI Processors支持双向流控制,允许下游处理器向上游发送反馈信息
class BidirectionalProcessor:async def process(self, input_stream, feedback_stream):# 同时处理输入和反馈async for input_part, feedback_part in zip(input_stream, feedback_stream):result = await self.handle_with_feedback(input_part, feedback_part)yield result

class Processor(ABC):@abstractmethodasync def process(self, input_stream: AsyncIterator[ProcessorPart]) -> AsyncIterator[ProcessorPart]:passdef __call__(self, input_stream):return self.process(input_stream)
# 处理链组合audio_processor = AudioTranscriber()text_processor = TextAnalyzer()response_generator = ResponseGenerator()# 链式处理async def process_audio_input(audio_stream):transcribed = audio_processor(audio_stream)analyzed = text_processor(transcribed)responses = response_generator(analyzed)return responses


-
AudioProcessor: 处理音频数据的专用处理器 -
TextProcessor: 文本处理和分析 -
ImageProcessor: 图像和视频帧处理 -
ModelProcessor: 与AI模型交互的处理器 -
StreamSplitter: 将单一流分割为多个并行流 -
StreamMerger: 合并多个流为单一输出 -
FilterProcessor: 基于条件过滤数据 -
TransformProcessor: 数据格式转换
1. 同步文本处理from genai_processors.models import GeminiTextProcessortext_processor = GeminiTextProcessor(model_name="gemini-pro",api_key="your-api-key",temperature=0.7,max_tokens=1000)async def process_text_query(query: str):input_part = ProcessorPart(content=TextContent(query),metadata={"user_id": "123", "session_id": "abc"})async for response_part in text_processor(async_iter([input_part])):return response_part.content.text2. Live API流式处理from genai_processors.models import GeminiLiveProcessorlive_processor = GeminiLiveProcessor(model_name="gemini-live",api_key="your-api-key",streaming=True,real_time_factor=1.0)async def handle_live_audio(audio_stream):async for audio_chunk in audio_stream:input_part = ProcessorPart(content=AudioContent(audio_chunk),metadata={"format": "wav", "sample_rate": 16000})async for response in live_processor(async_iter([input_part])):if response.content.type == "audio":yield response.content.audio_dataelif response.content.type == "text":print(f"Transcription: {response.content.text}")
class AsyncModelProcessor:async def process_batch(self, inputs: List[ProcessorPart]):# 并发处理多个输入tasks = [self.process_single(input_part) for input_part in inputs]results = await asyncio.gather(*tasks)return resultsasync def process_single(self, input_part: ProcessorPart):# 异步API调用async with aiohttp.ClientSession() as session:response = await session.post(self.api_endpoint, json=input_part.to_dict())return ProcessorPart.from_response(await response.json())
+ 运算符组合输入源和处理步骤,从而创建清晰的数据流from genai_processors.core import audio_io, live_model, video# Input processor: combines camera streams and audio streamsinput_processor = video.VideoIn() + audio_io.PyAudioIn(...)# Output processor: plays the audio parts. Handles interruptions and pauses# audio output when the user is speaking.play_output = audio_io.PyAudioOut(...)# Gemini Live API processorlive_processor = live_model.LiveProcessor(...)# Compose the agent: mic+camera -> Gemini Live API -> play audiolive_processor = live_model.LiveProcessor(...)live_agent = input_processor + live_processor + play_outputasync for part in live_agent(streams.endless_stream()):# Process the output parts (e.g., print transcription, model output, metadata)print(part)

class BatchProcessor:def __init__(self, batch_size: int = 32, max_concurrency: int = 10):self.batch_size = batch_sizeself.semaphore = asyncio.Semaphore(max_concurrency)async def process_batch(self, input_stream):batch = []async for item in input_stream:batch.append(item)if len(batch) >= self.batch_size:async with self.semaphore:results = await self.process_batch_items(batch)for result in results:yield resultbatch = []if batch:async with self.semaphore:results = await self.process_batch_items(batch)for result in results:yield result
class CustomAudioProcessor(Processor):def __init__(self, model_path: str, config: Dict[str, Any]):self.model = load_model(model_path)self.config = configasync def process(self, input_stream: AsyncIterator[ProcessorPart]) -> AsyncIterator[ProcessorPart]:async for part in input_stream:# 验证输入类型if not isinstance(part.content, AudioContent):raise ValueError(f"Expected AudioContent, got {type(part.content)}")audio_data = await self.preprocess_audio(part.content.audio_data)result = await self.model.predict(audio_data)# 创建输出ProcessorPartoutput_part = ProcessorPart(content=TextContent(result.transcription),metadata={**part.metadata,'confidence': result.confidence,'processing_time': result.processing_time})yield output_partasync def preprocess_audio(self, audio_data: bytes) -> np.ndarray:# 音频预处理逻辑audio_array = np.frombuffer(audio_data, dtype=np.int16)# 标准化audio_array = audio_array.astype(np.float32) / 32768.0# 重采样到目标频率if self.config.get('target_sample_rate'):audio_array = resample(audio_array, self.config['target_sample_rate'])return audio_arrayPartProcessor的高级用法对于需要更细粒度控制的场景,可以使用PartProcessor:class AdvancedPartProcessor(PartProcessor):async def process_part(self, part: ProcessorPart) -> AsyncIterator[ProcessorPart]:# 检查是否需要分割大型数据if part.content.size > self.max_chunk_size:# 分割为较小的块chunks = await self.split_content(part.content)for i, chunk in enumerate(chunks):chunk_part = ProcessorPart(content=chunk,metadata={**part.metadata,'chunk_index': i,'total_chunks': len(chunks)})processed_chunk = await self.process_chunk(chunk_part)yield processed_chunkelse:# 直接处理小数据result = await self.process_single_part(part)yield result



