1pub mod kernels;
6pub mod pager;
7pub mod proxy;
8pub mod suspend;
9
10use crate::pager::{Pager, run_pager};
11use anyhow::{Error, anyhow};
12use fidl::Peered;
13use fidl::endpoints::{DiscoverableProtocolMarker, Proxy, ServerEnd};
14use fidl_fuchsia_component as fcomponent;
15use fidl_fuchsia_component_decl as fdecl;
16use fidl_fuchsia_component_runner as frunner;
17use fidl_fuchsia_io as fio;
18use fidl_fuchsia_starnix_container as fstarnix;
19use fidl_fuchsia_starnix_runner as fstarnixrunner;
20use fuchsia_async as fasync;
21use fuchsia_component::client as fclient;
22use fuchsia_inspect::{HistogramProperty, NumericProperty};
23use fuchsia_sync::Mutex;
24use futures::TryStreamExt;
25use kernels::Kernels;
26use log::warn;
27use proxy::ChannelProxy;
28use rand::Rng;
29use starnix_sync::{LockDepMutex, TerminalLock};
30use std::future::Future;
31use std::sync::Arc;
32use suspend::{
33 ASLEEP_SIGNAL, AWAKE_SIGNAL, SuspendContext, WakeSource, WakeSources, suspend_container,
34};
35
36const KERNEL_COLLECTION: &str = "kernels";
38
39const CONTAINER_RUNNER_PROTOCOL: &str = "fuchsia.starnix.container.Runner";
46
47#[allow(dead_code)]
48pub struct StarnixKernel {
49 name: String,
51
52 exposed_dir: fio::DirectoryProxy,
56
57 component_instance: zx::Event,
59
60 job: Arc<zx::Job>,
62
63 wake_lease: Mutex<Option<zx::EventPair>>,
65}
66
67impl StarnixKernel {
68 pub async fn create(
74 realm: fcomponent::RealmProxy,
75 kernel_url: &str,
76 start_info: frunner::ComponentStartInfo,
77 controller: ServerEnd<frunner::ComponentControllerMarker>,
78 ) -> Result<(Self, impl Future<Output = ()>), Error> {
79 let kernel_name = generate_kernel_name(&start_info)?;
80 let component_instance = start_info
81 .component_instance
82 .as_ref()
83 .ok_or_else(|| {
84 anyhow::anyhow!("expected to find component_instance in ComponentStartInfo")
85 })?
86 .duplicate_handle(zx::Rights::SAME_RIGHTS)?;
87
88 let (controller_proxy, controller_server_end) = fidl::endpoints::create_proxy();
90 realm
91 .create_child(
92 &fdecl::CollectionRef { name: KERNEL_COLLECTION.into() },
93 &fdecl::Child {
94 name: Some(kernel_name.clone()),
95 url: Some(kernel_url.to_string()),
96 startup: Some(fdecl::StartupMode::Lazy),
97 ..Default::default()
98 },
99 fcomponent::CreateChildArgs {
100 controller: Some(controller_server_end),
101 ..Default::default()
102 },
103 )
104 .await?
105 .map_err(|e| anyhow::anyhow!("failed to create kernel: {:?}", e))?;
106
107 let exposed_dir = open_exposed_directory(&realm, &kernel_name, KERNEL_COLLECTION).await?;
108 let container_runner = fclient::connect_to_named_protocol_at_dir_root::<
109 frunner::ComponentRunnerMarker,
110 >(&exposed_dir, CONTAINER_RUNNER_PROTOCOL)?;
111
112 container_runner.start(start_info, controller)?;
114
115 let container_controller =
117 fclient::connect_to_protocol_at_dir_root::<fstarnix::ControllerMarker>(&exposed_dir)?;
118 let fstarnix::ControllerGetJobHandleResponse { job, .. } =
119 container_controller.get_job_handle().await?;
120 let Some(job) = job else {
121 anyhow::bail!("expected to find job in ControllerGetJobHandleResponse");
122 };
123
124 let kernel = Self {
125 name: kernel_name,
126 exposed_dir,
127 component_instance,
128 job: Arc::new(job),
129 wake_lease: Default::default(),
130 };
131 let on_stop = async move {
132 _ = controller_proxy.into_channel().unwrap().on_closed().await;
133 };
134 Ok((kernel, on_stop))
135 }
136
137 pub fn component_instance(&self) -> &zx::Event {
139 &self.component_instance
140 }
141
142 pub fn job(&self) -> &Arc<zx::Job> {
144 &self.job
145 }
146
147 pub fn connect_to_protocol<P: DiscoverableProtocolMarker>(&self) -> Result<P::Proxy, Error> {
149 fclient::connect_to_protocol_at_dir_root::<P>(&self.exposed_dir)
150 }
151}
152
153fn generate_kernel_name(_start_info: &frunner::ComponentStartInfo) -> Result<String, Error> {
159 let random_id: String =
160 rand::rng().sample_iter(&rand::distr::Alphanumeric).take(7).map(char::from).collect();
161 Ok(random_id)
162}
163
164async fn open_exposed_directory(
165 realm: &fcomponent::RealmProxy,
166 child_name: &str,
167 collection_name: &str,
168) -> Result<fio::DirectoryProxy, Error> {
169 let (directory_proxy, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
170 realm
171 .open_exposed_dir(
172 &fdecl::ChildRef { name: child_name.into(), collection: Some(collection_name.into()) },
173 server_end,
174 )
175 .await?
176 .map_err(|e| {
177 anyhow!(
178 "failed to bind to child {} in collection {:?}: {:?}",
179 child_name,
180 collection_name,
181 e
182 )
183 })?;
184 Ok(directory_proxy)
185}
186
187pub struct SuspendMessage {
188 pub payload: fstarnixrunner::ManagerSuspendContainerRequest,
189 pub responder: fstarnixrunner::ManagerSuspendContainerResponder,
190}
191
192pub async fn run_suspend_worker(
193 receiver: async_channel::Receiver<SuspendMessage>,
194 suspend_context: Arc<SuspendContext>,
195 kernels: &Kernels,
196) {
197 while let Ok(msg) = receiver.recv().await {
198 suspend_context.suspend_attempts_count.add(1);
199 match suspend_container(msg.payload, &suspend_context, &kernels).await {
200 Ok(response) => {
201 if let Err(e) = match response {
202 Ok(o) => {
203 suspend_context.suspend_successes_count.add(1);
204 if let Some(suspend_time) = o.suspend_time {
205 suspend_context.suspend_duration_histogram.insert(suspend_time as u64);
206 }
207 msg.responder.send(Ok(&o))
208 }
209 Err(e) => {
210 suspend_context.suspend_failures_count.add(1);
211 msg.responder.send(Err(e))
212 }
213 } {
214 warn!("error replying to suspend request: {e}");
215 }
216 }
217 Err(e) => {
218 suspend_context.suspend_failures_count.add(1);
219 warn!("error executing suspend: {e}");
220 let _ = msg.responder.send(Err(fstarnixrunner::SuspendError::SuspendFailure));
221 }
222 }
223 }
224}
225
226pub async fn serve_starnix_manager(
227 mut stream: fstarnixrunner::ManagerRequestStream,
228 suspend_context: Arc<SuspendContext>,
229 sender: &async_channel::Sender<(ChannelProxy, Arc<LockDepMutex<WakeSources, TerminalLock>>)>,
230 pager: Arc<Pager>,
231 suspend_sender: &async_channel::Sender<SuspendMessage>,
232) -> Result<(), Error> {
233 while let Some(event) = stream.try_next().await? {
234 match event {
235 fstarnixrunner::ManagerRequest::SuspendContainer { payload, responder, .. } => {
236 if let Err(e) = suspend_sender.try_send(SuspendMessage { payload, responder }) {
237 warn!("failed to send suspend request to worker: {e}");
238 }
239 }
240 fstarnixrunner::ManagerRequest::ProxyWakeChannel { payload, .. } => {
241 let fstarnixrunner::ManagerProxyWakeChannelRequest {
242 container_job: Some(_container_job),
244 remote_channel: Some(remote_channel),
245 container_channel: Some(container_channel),
246 name: Some(name),
247 counter: Some(message_counter),
248 ..
249 } = payload
250 else {
251 continue;
252 };
253
254 suspend_context.wake_sources.lock().insert(
255 message_counter.koid().unwrap(),
256 WakeSource::from_counter(
257 message_counter.duplicate_handle(zx::Rights::SAME_RIGHTS)?,
258 name.clone(),
259 ),
260 );
261
262 let proxy =
263 ChannelProxy { container_channel, remote_channel, message_counter, name };
264
265 sender.try_send((proxy, suspend_context.wake_sources.clone())).unwrap();
266 }
267 fstarnixrunner::ManagerRequest::RegisterWakeWatcher { payload, responder } => {
268 if let Some(watcher) = payload.watcher {
269 let (clear_mask, set_mask) = (ASLEEP_SIGNAL, AWAKE_SIGNAL);
270 watcher.signal_peer(clear_mask, set_mask)?;
271
272 suspend_context.wake_watchers.lock().push(watcher);
273 }
274 if let Err(e) = responder.send() {
275 warn!("error registering power watcher: {e}");
276 }
277 }
278 fstarnixrunner::ManagerRequest::AddWakeSource { payload, .. } => {
279 let fstarnixrunner::ManagerAddWakeSourceRequest {
280 container_job: Some(_container_job),
282 name: Some(name),
283 handle: Some(handle),
284 signals: Some(signals),
285 ..
286 } = payload
287 else {
288 continue;
289 };
290 suspend_context.wake_sources.lock().insert(
291 handle.koid().unwrap(),
292 WakeSource::from_handle(
293 handle,
294 name.clone(),
295 zx::Signals::from_bits_truncate(signals),
296 ),
297 );
298 }
299 fstarnixrunner::ManagerRequest::RemoveWakeSource { payload, .. } => {
300 let fstarnixrunner::ManagerRemoveWakeSourceRequest {
301 container_job: Some(_container_job),
303 handle: Some(handle),
304 ..
305 } = payload
306 else {
307 continue;
308 };
309
310 suspend_context.wake_sources.lock().remove(&handle.koid().unwrap());
311 }
312 fstarnixrunner::ManagerRequest::CreatePager { payload, .. } => {
313 fasync::Task::spawn(run_pager(payload, pager.clone())).detach();
314 }
315 _ => {}
316 }
317 }
318 Ok(())
319}