Google Pub/Sub Java examples

Andrea Zonzin picture Andrea Zonzin · Jun 11, 2017 · Viewed 10.2k times · Source

I'm not able to find a way to read messages from pub/sub using java.

I'm using this maven dependency in my pom

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>0.17.2-alpha</version>
</dependency>

I implemented this main method to create a new topic:

public static void main(String... args) throws Exception {

        // Your Google Cloud Platform project ID
        String projectId = ServiceOptions.getDefaultProjectId();

        // Your topic ID
        String topicId = "my-new-topic-1";
        // Create a new topic
        TopicName topic = TopicName.create(projectId, topicId);
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            topicAdminClient.createTopic(topic); 
        }
}

The above code works well and, indeed, I can see the new topic I created using the google cloud console.

I implemented the following main method to write a message to my topic:

public static void main(String a[]) throws InterruptedException, ExecutionException{
        String projectId = ServiceOptions.getDefaultProjectId(); 
        String topicId = "my-new-topic-1";

        String payload = "Hellooooo!!!";
        PubsubMessage pubsubMessage =
                  PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

        TopicName topic = TopicName.create(projectId, topicId);

        Publisher publisher;
        try {
            publisher = Publisher.defaultBuilder(
                    topic)
                    .build();
            publisher.publish(pubsubMessage);

            System.out.println("Sent!");
        } catch (IOException e) {
            System.out.println("Not Sended!");
            e.printStackTrace();
        }
}

Now I'm not able to verify if this message was really sent. I would like to implement a message reader using a subscription to my topic. Could someone show me a correct and working java example about reading messages from a topic?

Anyone can help me? Thanks in advance!

Answer

Kishore Namala picture Kishore Namala · Mar 21, 2019

Here is the version using the google cloud client libraries.


package com.techm.data.client;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;

/**
 * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull
 * subscription and asynchronously pull messages from it.
 */
public class CreateSubscriptionAndConsumeMessages {

    private static String projectId = "projectId";
    private static String topicId = "topicName";
    private static String subscriptionId = "subscriptionName";

    public static void createSubscription() throws Exception {
        ProjectTopicName topic = ProjectTopicName.of(projectId, topicId);
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionId);

        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
            subscriptionAdminClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);
        }
    }

    public static void main(String... args) throws Exception {
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionId);       

        createSubscription();


        MessageReceiver receiver = new MessageReceiver() {
            @Override
            public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                System.out.println("Received message: " + message.getData().toStringUtf8());
                consumer.ack();
            }
        };
        Subscriber subscriber = null;
        try {
            subscriber = Subscriber.newBuilder(subscription, receiver).build();
            subscriber.addListener(new Subscriber.Listener() {
                @Override
                public void failed(Subscriber.State from, Throwable failure) {
                    // Handle failure. This is called when the Subscriber encountered a fatal error
                    // and is
                    // shutting down.
                    System.err.println(failure);
                }
            }, MoreExecutors.directExecutor());
            subscriber.startAsync().awaitRunning();         

            // In this example, we will pull messages for one minute (60,000ms) then stop.
            // In a real application, this sleep-then-stop is not necessary.
            // Simply call stopAsync().awaitTerminated() when the server is shutting down,
            // etc.
            Thread.sleep(60000);
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync().awaitTerminated();
            }
        }
    }
}

This is working fine for me.