WEB3DEV

Cover image for libp2p - pubsub com Golang
Paulo Gio
Paulo Gio

Posted on • Atualizado em

libp2p - pubsub com Golang

libp2p gossipsub

Contexto da Discussão

Neste post, vou discutir sobre a construção de um sistema publicador/subscritor (pubsub) ponto a ponto com libp2p-pubsub. A implementação foi feita com a biblioteca go-libp2p-pubsub. Todos os códigos-fonte relacionados a este post estão disponíveis no Gitlab. Por favor, clone o repositório e continue o post.

libp2p

libp2p é um framework de rede que permite escrever aplicativos ponto a ponto (p2p) descentralizados. Ele evoluiu do IPFS e se tornou um projeto independente. Certamente há uma tonelada de aplicativos sendo construídos usando libp2p. IPFS, Parity, Ethereum, Filecoin, Polkadot são alguns exemplos populares. O libp2p fornece soluções flexíveis para elementos p2p essenciais, como transporte, segurança, roteamento entre pares e descoberta de conteúdo. Ele consiste em um catálogo de módulos a partir do qual os desenvolvedores de redes p2p podem selecionar e reutilizar apenas os protocolos necessários, facilitando a atualização e a interoperabilidade entre os aplicativos. libp2p tem implementações em Go, JavaScript, Rust, Python e C++. A especificação do libp2p pode ser encontrada em seu repositório de especificações.

libp2p-pubsub

pubsub é um sistema de mensagens usado para aprimorar a comunicação assíncrona entre remetentes e destinatários. Nesse sistema, um nó pode publicar conteúdo em um tópico enquanto outro nó pode se inscrever em cada tópico de interesse. No sistema pubsub p2p, os pares participam entregando mensagens aos grupos de receptores interessados sem depender de uma infraestrutura centralizada.

O libp2p-pubsub criou várias implementações ponto a ponto do sistema pubsub (por exemplo, gossipsub, floodsub, fpisub), que permitiram o desenvolvimento de aplicativos pubsub p2p em tempo real (leia mais sobre essas diferentes implementações de pubsub aqui). Neste post eu usei o libp2p gossipsub para desenvolver o aplicativo pubsub p2p. gossipsub é nomeado após o fato de que os pares fofocam (gossip) entre si sobre quais mensagens eles viram e usam essas informações para manter uma rede de entrega de mensagens.

Estruturação

Eu usei a biblioteca libp2p-pubsub para construir o Librumchain, que é um armazenamento blockchain leve e altamente escalável. No Librumchain, há um grupo de consenso central que é executado em nós baseados em nuvem com um consenso Proof-of-Authority. Os nós de borda foram projetados para serem executados em nós leves baseados em Raspberry-Pi. O grupo de consenso principal gera blocos e os armazena no IPFS-Cluster (o IPFS-Cluster foi usado como armazenamento de blocos). As informações de bloco geradas (por exemplo, o hash IPFS do bloco) serão publicadas nos nós de borda Raspberry-Pi via libp2p gossipsub. A figura a seguir descreve a arquitetura do Librumchain com os nós do grupo de consenso Central, nós IPFS-Cluster e nós de borda Raspberry-Pi.

https://miro.medium.com/max/1400/1*44Mk45QPfkfVIZLO-6qSKw.png

Neste post, discutimos sobre o desenvolvimento de um simples aplicativo pubsub com libp2p gossipsub. No aplicativo pubsub, o publicador recebe mensagens da linha de comando e as envia para um tópico chamado librum. Em seguida, os subscritores deste tópico librum recebem os dados via gossipsub p2p. Esta é uma versão simplificada do sistema pubsub do Librumchain.

https://miro.medium.com/max/1400/1*clj42llqgK-rkICx9zERbA.png

Publicador gossipsub

A seguir está a implementação golang do publicador. Eu adicionei comentários para descrever a função de cada linha no código. Ele cria o host libp2p, o roteador gossipsub, configura a descoberta do nó com mDNS (DiscoveryServiceTag, usado para descobrir os pares na rede do tópico) e cria o publicador no tópico do librum. O publicador recebe mensagens acessando a linha de comando, e então publica no tópico.

package main

import (
    "bufio"
    "context"
    "fmt"
    "os"
    "time"

    "github.com/libp2p/go-libp2p"
    "github.com/libp2p/go-libp2p-core/host"
    "github.com/libp2p/go-libp2p-core/peer"
    pubsub "github.com/libp2p/go-libp2p-pubsub"
    "github.com/libp2p/go-libp2p/p2p/discovery/mdns"
)

// DiscoveryInterval é a frequência com que republicamos nossos registros mDNS.
const DiscoveryInterval = time.Hour

// DiscoveryServiceTag é usado em nossos anúncios mDNS para descobrir outros pares para chat.
const DiscoveryServiceTag = "librum-pubsub"

func main() {
    ctx := context.Background()

    // criar um novo host libp2p que acessa em uma porta TCP aleatória
    // podemos especificar uma porta como /ip4/0.0.0.0/tcp/3326
    host, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
    if err != nil {
        panic(err)
    }

    // ver detalhes e endereços do host
    fmt.Printf("host ID %s\n", host.ID().Pretty())
    fmt.Printf("following are the assigned addresses\n")
    for _, addr := range host.Addrs() {
        fmt.Printf("%s\n", addr.String())
    }
    fmt.Printf("\n")

    // criar um novo serviço PubSub usando o roteador GossipSub
    gossipSub, err := pubsub.NewGossipSub(ctx, host)
    if err != nil {
        panic(err)
    }

    // configurar descoberta de mDNS local
    if err := setupDiscovery(host); err != nil {
        panic(err)
    }

    // juntar-se ao tópico pubsub chamado librum
    room := "librum"
    topic, err := gossipSub.Join(room)
    if err != nil {
        panic(err)
    }

    // criar publicador
    publish(ctx, topic)
}

// iniciar publicador para tópico
func publish(ctx context.Context, topic *pubsub.Topic) {
    for {
        scanner := bufio.NewScanner(os.Stdin)
        for scanner.Scan() {
            fmt.Printf("digite a mensagem para publicar: \n")

            msg := scanner.Text()
            if len(msg) != 0 {
                // publicar mensagem no tópico
                bytes := []byte(msg)
                topic.Publish(ctx, bytes)
            }
        }
    }
}

// DiscoveryNotifee é notificado quando encontramos um novo par por meio da descoberta mDNS
type discoveryNotifee struct {
    h host.Host
}

// HandlePeerFound se conecta a pares descobertos via mDNS. Assim que estiverem conectados, o sistema PubSub começará a interagir automaticamente com eles se também suportarem o PubSub
func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
    fmt.Printf("discovered new peer %s\n", pi.ID.Pretty())
    err := n.h.Connect(context.Background(), pi)
    if err != nil {
        fmt.Printf("error connecting to peer %s: %s\n", pi.ID.Pretty(), err)
    }
}

// setupDiscovery cria um serviço de descoberta mDNS e o anexa ao Host libp2p.
// Isso nos permite descobrir automaticamente pares na mesma LAN e nos conectar a eles.
func setupDiscovery(h host.Host) error {
    // configurar descoberta de mDNS para encontrar pares locais
    s := mdns.NewMdnsService(h, DiscoveryServiceTag, &discoveryNotifee{h: h})
    return s.Start()
}
Enter fullscreen mode Exit fullscreen mode

Subscritor gossipsub

A seguir está a implementação golang do subscritor. Eu adicionei comentários para descrever a função de cada linha no código. Ele cria o host libp2p, o roteador gossipsub, configura a descoberta do nó com mDNS (DiscoveryServiceTag, usado para descobrir os pares na rede do tópico) e cria um subscritor para o tópico librum.

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/libp2p/go-libp2p"
    "github.com/libp2p/go-libp2p-core/host"
    "github.com/libp2p/go-libp2p-core/peer"
    pubsub "github.com/libp2p/go-libp2p-pubsub"
    "github.com/libp2p/go-libp2p/p2p/discovery/mdns"
)

// DiscoveryInterval é a frequência com que republicamos nossos registros mDNS.
const DiscoveryInterval = time.Hour

// DiscoveryServiceTag é usado em nossos anúncios mDNS para descobrir outros pares para chat.
const DiscoveryServiceTag = "librum-pubsub"

func main() {
    ctx := context.Background()

    // criar um novo host libp2p que acessa em uma porta TCP aleatória
    host, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
    if err != nil {
        panic(err)
    }

    // ver detalhes e endereços do host
    fmt.Printf("host ID %s\n", host.ID().Pretty())
    fmt.Printf("following are the assigned addresses\n")
    for _, addr := range host.Addrs() {
        fmt.Printf("%s\n", addr.String())
    }
    fmt.Printf("\n")

    // criar um novo serviço PubSub usando o roteador GossipSub
    gossipSub, err := pubsub.NewGossipSub(ctx, host)
    if err != nil {
        panic(err)
    }

    // configurar descoberta de mDNS local
    if err := setupDiscovery(host); err != nil {
        panic(err)
    }

    // juntar-se ao tópico pubsub chamado librum
    room := "librum"
    topic, err := gossipSub.Join(room)
    if err != nil {
        panic(err)
    }

    // inscrever no tópico
    subscriber, err := topic.Subscribe()
    if err != nil {
        panic(err)
    }
    subscribe(subscriber, ctx, host.ID())
}

// iniciar o subscritor do tópico
func subscribe(subscriber *pubsub.Subscription, ctx context.Context, hostID peer.ID) {
    for {
        msg, err := subscriber.Next(ctx)
        if err != nil {
            panic(err)
        }

        // considerar apenas mensagens entregues por outros pares
        if msg.ReceivedFrom == hostID {
            continue
        }

        fmt.Printf("got message: %s, from: %s\n", string(msg.Data), msg.ReceivedFrom.Pretty())
    }
}

// DiscoveryNotifee é notificado quando encontramos um novo par por meio da descoberta mDNS
type discoveryNotifee struct {
    h host.Host
}

// HandlePeerFound se conecta a pares descobertos via mDNS. Assim que estiverem conectados, o sistema PubSub começará a interagir automaticamente com eles se também suportarem o PubSub.
func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
    fmt.Printf("discovered new peer %s\n", pi.ID.Pretty())
    err := n.h.Connect(context.Background(), pi)
    if err != nil {
        fmt.Printf("error connecting to peer %s: %s\n", pi.ID.Pretty(), err)
    }
}

// setupDiscovery cria um serviço de descoberta mDNS e o anexa ao Host libp2p.
// Isso nos permite descobrir automaticamente pares na mesma LAN e nos conectar a eles.
func setupDiscovery(h host.Host) error {
    // configurar descoberta de mDNS para encontrar pares locais
    s := mdns.NewMdnsService(h, DiscoveryServiceTag, &discoveryNotifee{h: h})
    return s.Start()
}
Enter fullscreen mode Exit fullscreen mode

Executar aplicativo

A seguir está descrito como construir e executar o aplicativo. Executei com dois subscritores e um publicador. Os dados da publicação estarão disponíveis para os subscritores em tempo real.

# instalar dependências do golang  
go mod init
go mod tidy

---

# criar publicador e subscritores no diretório ./build
go build -o build/publisher src/publisher.go
go build -o build/subscriber src/subsciber.go

---

# executar subscritores1 (no terminal 1)
./build/subscriber

# executar subscritores2 (no terminal 2)
./build/subscriber

# executar publicador (no terminal 3)
./build/publisher
Enter fullscreen mode Exit fullscreen mode

https://miro.medium.com/max/1400/1*jKN9rHtoSEoaOTG89PZw9Q.png

Referências

  1. https://spec.filecoin.io/libraries/libp2p/
  2. https://www.parity.io/blog/why-libp2p
  3. https://blog.ipfs.io/2020-05-20-gossipsub-v1.1/
  4. https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub
  5. https://hackernoon.com/exploring-libp2p-pubsub-protocol-implementations-891i32jq
  6. https://consensys.net/diligence/vulnerabilities/eth2-teku-dos-gossipsub/
  7. https://docs.libp2p.io/concepts/publish-subscribe/
  8. https://github.com/libp2p/specs/tree/master/pubsub/gossipsub
  9. https://github.com/libp2p/go-libp2p/tree/master/examples/pubsub/chat

Este artigo foi escrito por λ.eranga, e traduzido por Paulinho Giovannini. Encontre o artigo original aqui.

Top comments (0)