18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 | class MixtureOfAgentsPack(BaseLlamaPack):
def __init__(
self,
llm: LLM,
reference_llms: List[LLM],
num_layers: int = 3,
max_tokens: int = 2048,
temperature: float = 0.7,
) -> None:
self.llm = llm
self.reference_llms = reference_llms
self.num_layers = num_layers
self.max_tokens = max_tokens
self.temperature = temperature
def inject_references_to_messages(
self,
messages: List[ChatMessage],
references: List[str],
) -> List[ChatMessage]:
messages = copy.deepcopy(messages)
system = f"""You have been provided with a set of responses from various open-source models to the latest user query. Your task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. Your response should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability.
Responses from models:"""
for i, reference in enumerate(references):
system += f"\n{i+1}. {reference}"
if messages[0].role == "system":
messages[0].content += "\n\n" + system
else:
messages = [ChatMessage(role="system", content=system), *messages]
return messages
async def agenerate_with_references(
self,
llm: LLM,
messages: List[ChatMessage],
references: List[str],
max_tokens: int,
temperature: float,
) -> str:
if len(references) > 0:
messages = self.inject_references_to_messages(messages, references)
return str(
await llm.achat(messages, max_tokens=max_tokens, temperature=temperature)
).strip()
async def get_answer(self, query_str: str) -> str:
messages = []
messages.append(ChatMessage(role="user", content=query_str))
references = []
if len(self.reference_llms) > 0:
prev_references = []
for layer in range(self.num_layers):
logger.info(
f"Round {layer+1}/{self.num_layers} to collecting reference responses."
)
references = []
jobs = [
self.agenerate_with_references(
llm=reference_llm,
messages=messages,
references=prev_references,
max_tokens=self.max_tokens,
temperature=self.temperature,
)
for reference_llm in self.reference_llms
]
references = await asyncio.gather(*jobs)
if layer < self.num_layers - 1:
prev_references = references
references = []
return await self.agenerate_with_references(
llm=self.llm,
messages=messages,
max_tokens=self.max_tokens,
temperature=self.temperature,
references=references,
)
def get_modules(self) -> Dict[str, Any]:
"""Get modules."""
return {
"llm": self.llm,
"reference_llms": self.reference_llms,
"num_layers": self.num_layers,
}
def run(self, query_str: str, **kwargs: Any) -> Any:
"""Run the pipeline."""
return asyncio.run(self.get_answer(query_str))
|