This section of the blog will cover flow of code path taken by CEPH when read or write request is initiated by the CEPH client.
It may still not capture 100% of the code flow but a great staring point for blog reader. I hope it may be easy for new CEPH user and save bunch of engineering time.
I extensively used gdb in preparing below code snippet. In meanwhile I will try add more detail as I gather new information as part of my debugging effort.
I have a setup 1 CEPH node and 1 CEPH client hosting fio to perform read and write to rados block device.
In CEPH data is handle by messenger. So individual daemon like ceph-mon and ceph-osd will create specific messenger to handle incoming request. Request can originate from client or cluster.
In case of ceph-osd following messenger will be created for handling request
client : Handle request from Client
cluster: Handle request from osd, monitors..
hbclient : Heartbeat ??
hb_back_server: Heartbeat ??
hb_front_server: Heartbeat ??
ms_objecter :
src/ceph_osd.cc
There are three type of Messenger : SimpleMessenger, Async, Xio
Messenger::create() : It will create SimpleMessenger Object.
SimpleMessenger.cc (msg/simple/SimpleMessenger.cc)
SimpleMessenger::SimpleMessenger()
{
Initialize dispatch_queue, reaper_thread,
}
src/ceph_osd.cc
Start all the messenger created
ms_objecter->start()
cluster->start()
SimpleMessenger::start()
{
Start the reaper thread and loop in SimpleMessenger::reaper_entry() and wait for start OSD hasn't started yet.
}
src/ceph_osd.c
===========
osd->init() // start the OSD
osd/OSD.cc
=========
OSD::init()
{
Add individual dispatcher for each Messenger Object. Need to put more detail.
client_messenger->add_dispatcher_head() --> SimpleMessenger::ready
cluster_messenger->add_dispatcher_head() --> SimpleMessenger::ready
}
SimpleMessenger.cc (msg/simple/SimpleMessenger.cc)
==========================================
SimpleMessenger::ready()
{
Start the dispatch_queue.start() thread.
Start the Acceptor accepter.start()
}
Accepter::start()
{
create() thread and call entry point of Accepter i.e Accepter::entry
}
Accepter Thread
This routine polls for any incoming connection from the ceph client. SimpleMessenger will bind to specific address and port.
Accepter::entry() { //msg/simple/Accepter.cc
Here we do the real process of incoming request.
accept(listen_sd, .....)
}
Accepter will hand over the socket fd to SimpleMessenger to start reading the message send by the incoming connection.
SimpleMessenger::add_accept_pipe(int sd) { // msg/simple/SimpleMessenger.cc
-> p->start_reader
}
void Pipe::start_reader() //msg/simple/Pipe.cc
-> reader_thread.create(); // create reader thread common/Thread.cc
-> Thread::entry_wrapper() {
return entry() --> will call entry function in the base class
}
// entry function in the base class Pipe::Reader::entry()
*entry() { //msg/simple/Pipe.h
pipe->reader()
}
void Pipe::reader() { //msg/simple/Pipe.cc
if the message can be fast dispatch
in_q->fast_dispatch(m);
else
in_q->enqueue(m, m->get_priority(), conn_id);
}
DispatchQueue::fast_dispatch(Message *m) {
-> msgr->ms_fast_dispatch(m); // msg/Messenger.h
}
Send a single message via fast dispatch
ms_fast_dispatch(Message *m) { // msg/Messenger.h
call OSD::ms_fast_dispatch();
}
OSD::ms_fast_dispatch(Message *m) { //OSD.cc
dispatch_session_waiting
}
OSD::dispatch_session_waiting(Message *m) { //OSD.cc
dispatch_op_fast(m);
}
Based on client issued OSD operation are handled by fast dispatch here.
OSD::dispatch_op_fast { //OSD.cc
switch(op->get_req()->get_type()) {
// client ops
case CEPH_MSG_OSD_OP:
handle_op;
case MSG_OSD_XXXX:
}
OSD::handle_op() //OSD.cc
{
PG *pg = get_pg_or_queue_for_pg(pgid, op);
enqueue_op(pg,p);
}
OSD::enqueue_op(pg, op)
{
pg->queue_op(op);
}
PG::queue_op(OpRequestRef& op) { osd/PG.cc
osd->op_wq.queue()
}
void queue(T item) { common/WorkQueue.h
_enqueue(item);
}
Dequeue Thread ready to process the enqueued operation/
void OSD::ShardedOpWQ::_process { OSD.cc
osd->dequeue_op(item.first, op, tp_handle);
}
OSD::dequeue_op() { //OSD.cc
pg->do_request(op, handle);
}
void ReplicatedPG::do_request(OpRequestRef& op, ThreadPool::TPHandle &handle) { //ReplicatedPG.cc
case CEPH_MSG_OSD_OP:
do_op();
}
ReplicatedPG::do_op() {
execute_ctx();
}
ReplicatedPG::execute_ctx() { //ReplicatedPG.cc
prepare_transaction();
issue_repop(); // issue the transaction
}
ReplicatedPG::prepare_transaction() {
do_osd_ops();
}
ReplicatedPG::do_osd_ops() {
case CEPH_OSD_OP_WRITE:
case CEPH_OSD_OP_READ:
}
void ReplicatedPG::issue_repop(RepGather *repop) { //ReplicatedPG.cc
pgbackend->submit_transaction() //submit op to the backend
}
void ReplicatedBackend::submit_transaction() { // ReplicatedBackend.cc
issue_op();
parent->queue_transaction(op_t, op.op);
}
int FileStore::queue_transactions() { //FileStore.cc
if (journal)
osr->queue_journal(o->op);
/* Once Journal Transaction is finished callback _journal_ahead will be called */
_op_journal_transactions(.., new C_JournaledAhead() ); // --> JournalingObjectStore.cc
}
}
JournalingObjectStore::_op_journal_transactions() { //JournalingObjectStore.cc
journal->submit_entry(); //queue the request
}
FileJournal::submit_entry() {
queue the request ...
}
FileJournal::write_thread_entry() {
perform write... (Will back to expand further )
}
FileStore::_journaled_ahead() {
/* Journal write was success */
queue_op();
}
FileStore::queue_op () {
op_wq.queue();
}
Worker Thread
ThreadPool::worker() { //WorkQueue.cc
wq->_void_process () //Queue for Processing.
-------> _process() //WorkQueue.h
wq->_void_process_finish(item); // EnQueue after processsing.
--------> _process_finish
}
FileStore::_process() { //FileStore.h
store->_do_op();
}
FileStore::_process_finish() {
store->_finish_op();
}
FileStore::_finish_op() {
// dequeue
}
FileStore::_do_op() { //FileStore.cc
_do_transactions(); --> _do_transaction()
}
FileStore::_do_transaction() {
case Transaction::OP_WRITE
_write();
}
Send a single message. Send to each dispatcher in sequence.
Messenger::ms_deliver_dispatch() { //msg/Messenger.h
((*p)->ms_dispatch(m)) //osd/OSD.cc
}
OSD::ms_dispatch(Message *m) {
_dispatch() // osd/OSD.cc
}
Based on client management related operation are handled here..
void OSD::_dispatch(Message *m) {
switch (m->get_type()) {
case CEPH_MSG_PING:
case CEPH_MSG_OSD_MAP:
}
}
It may still not capture 100% of the code flow but a great staring point for blog reader. I hope it may be easy for new CEPH user and save bunch of engineering time.
I extensively used gdb in preparing below code snippet. In meanwhile I will try add more detail as I gather new information as part of my debugging effort.
I have a setup 1 CEPH node and 1 CEPH client hosting fio to perform read and write to rados block device.
In CEPH data is handle by messenger. So individual daemon like ceph-mon and ceph-osd will create specific messenger to handle incoming request. Request can originate from client or cluster.
In case of ceph-osd following messenger will be created for handling request
client : Handle request from Client
cluster: Handle request from osd, monitors..
hbclient : Heartbeat ??
hb_back_server: Heartbeat ??
hb_front_server: Heartbeat ??
ms_objecter :
src/ceph_osd.cc
There are three type of Messenger : SimpleMessenger, Async, Xio
Messenger::create() : It will create SimpleMessenger Object.
SimpleMessenger.cc (msg/simple/SimpleMessenger.cc)
SimpleMessenger::SimpleMessenger()
{
Initialize dispatch_queue, reaper_thread,
}
src/ceph_osd.cc
Start all the messenger created
ms_objecter->start()
cluster->start()
SimpleMessenger::start()
{
Start the reaper thread and loop in SimpleMessenger::reaper_entry() and wait for start OSD hasn't started yet.
}
src/ceph_osd.c
===========
osd->init() // start the OSD
osd/OSD.cc
=========
OSD::init()
{
Add individual dispatcher for each Messenger Object. Need to put more detail.
client_messenger->add_dispatcher_head() --> SimpleMessenger::ready
cluster_messenger->add_dispatcher_head() --> SimpleMessenger::ready
}
SimpleMessenger.cc (msg/simple/SimpleMessenger.cc)
==========================================
SimpleMessenger::ready()
{
Start the dispatch_queue.start() thread.
Start the Acceptor accepter.start()
}
Accepter::start()
{
create() thread and call entry point of Accepter i.e Accepter::entry
}
Accepter Thread
This routine polls for any incoming connection from the ceph client. SimpleMessenger will bind to specific address and port.
Accepter::entry() { //msg/simple/Accepter.cc
Here we do the real process of incoming request.
accept(listen_sd, .....)
}
Accepter will hand over the socket fd to SimpleMessenger to start reading the message send by the incoming connection.
SimpleMessenger::add_accept_pipe(int sd) { // msg/simple/SimpleMessenger.cc
-> p->start_reader
}
void Pipe::start_reader() //msg/simple/Pipe.cc
-> reader_thread.create(); // create reader thread common/Thread.cc
-> Thread::entry_wrapper() {
return entry() --> will call entry function in the base class
}
// entry function in the base class Pipe::Reader::entry()
*entry() { //msg/simple/Pipe.h
pipe->reader()
}
void Pipe::reader() { //msg/simple/Pipe.cc
if the message can be fast dispatch
in_q->fast_dispatch(m);
else
in_q->enqueue(m, m->get_priority(), conn_id);
}
DispatchQueue::fast_dispatch(Message *m) {
-> msgr->ms_fast_dispatch(m); // msg/Messenger.h
}
Send a single message via fast dispatch
ms_fast_dispatch(Message *m) { // msg/Messenger.h
call OSD::ms_fast_dispatch();
}
OSD::ms_fast_dispatch(Message *m) { //OSD.cc
dispatch_session_waiting
}
OSD::dispatch_session_waiting(Message *m) { //OSD.cc
dispatch_op_fast(m);
}
Based on client issued OSD operation are handled by fast dispatch here.
OSD::dispatch_op_fast { //OSD.cc
switch(op->get_req()->get_type()) {
// client ops
case CEPH_MSG_OSD_OP:
handle_op;
case MSG_OSD_XXXX:
}
OSD::handle_op() //OSD.cc
{
PG *pg = get_pg_or_queue_for_pg(pgid, op);
enqueue_op(pg,p);
}
OSD::enqueue_op(pg, op)
{
pg->queue_op(op);
}
PG::queue_op(OpRequestRef& op) { osd/PG.cc
osd->op_wq.queue()
}
void queue(T item) { common/WorkQueue.h
_enqueue(item);
}
Dequeue Thread ready to process the enqueued operation/
void OSD::ShardedOpWQ::_process { OSD.cc
osd->dequeue_op(item.first, op, tp_handle);
}
OSD::dequeue_op() { //OSD.cc
pg->do_request(op, handle);
}
void ReplicatedPG::do_request(OpRequestRef& op, ThreadPool::TPHandle &handle) { //ReplicatedPG.cc
case CEPH_MSG_OSD_OP:
do_op();
}
ReplicatedPG::do_op() {
execute_ctx();
}
ReplicatedPG::execute_ctx() { //ReplicatedPG.cc
prepare_transaction();
issue_repop(); // issue the transaction
}
ReplicatedPG::prepare_transaction() {
do_osd_ops();
}
ReplicatedPG::do_osd_ops() {
case CEPH_OSD_OP_WRITE:
case CEPH_OSD_OP_READ:
}
void ReplicatedPG::issue_repop(RepGather *repop) { //ReplicatedPG.cc
pgbackend->submit_transaction() //submit op to the backend
}
void ReplicatedBackend::submit_transaction() { // ReplicatedBackend.cc
issue_op();
parent->queue_transaction(op_t, op.op);
}
int FileStore::queue_transactions() { //FileStore.cc
if (journal)
osr->queue_journal(o->op);
/* Once Journal Transaction is finished callback _journal_ahead will be called */
_op_journal_transactions(.., new C_JournaledAhead() ); // --> JournalingObjectStore.cc
}
}
JournalingObjectStore::_op_journal_transactions() { //JournalingObjectStore.cc
journal->submit_entry(); //queue the request
}
FileJournal::submit_entry() {
queue the request ...
}
FileJournal::write_thread_entry() {
perform write... (Will back to expand further )
}
FileStore::_journaled_ahead() {
/* Journal write was success */
queue_op();
}
FileStore::queue_op () {
op_wq.queue();
}
Worker Thread
ThreadPool::worker() { //WorkQueue.cc
wq->_void_process () //Queue for Processing.
-------> _process() //WorkQueue.h
wq->_void_process_finish(item); // EnQueue after processsing.
--------> _process_finish
}
FileStore::_process() { //FileStore.h
store->_do_op();
}
FileStore::_process_finish() {
store->_finish_op();
}
FileStore::_finish_op() {
// dequeue
}
FileStore::_do_op() { //FileStore.cc
_do_transactions(); --> _do_transaction()
}
FileStore::_do_transaction() {
case Transaction::OP_WRITE
_write();
}
Send a single message. Send to each dispatcher in sequence.
Messenger::ms_deliver_dispatch() { //msg/Messenger.h
((*p)->ms_dispatch(m)) //osd/OSD.cc
}
OSD::ms_dispatch(Message *m) {
_dispatch() // osd/OSD.cc
}
Based on client management related operation are handled here..
void OSD::_dispatch(Message *m) {
switch (m->get_type()) {
case CEPH_MSG_PING:
case CEPH_MSG_OSD_MAP:
}
}