Python
Overview
Our service takes in a payload containing bytes and capitalizes them.
Using OpenCensus, we can collect traces of our system and export them to the backend of our choice, to give observability to our distributed systems.
Before beginning, if you haven’t already:
- Setup gRPC for Python by visiting this quickstart page https://grpc.io/docs/quickstart/python.html
- Setup Stackdriver Tracing and Monitoring
Setup
Make sure to setup your GOOGLE_APPLICATION_CREDENTIALS
environment variable. Visit here for instructions on how to do so.
Installation
This walkthrough will be using Python 3.
Install the required modules by running this command in your terminal:
python3 -m pip install grpcio-tools opencensus google-cloud-trace
Next, let’s setup our working directory. Run the following commands in your terminal:
touch capitalizeServer.py
touch capitalizeClient.py
mkdir proto
touch proto/defs.proto
Our working directory will now look like this:
./capitalizeServer.py
./capitalizeClient.py
./proto/
./proto/defs.proto
Protobuf Definition
Copy and paste the following code inside of ./proto/defs.proto
:
syntax = "proto3";
package rpc;
message Payload {
int32 id = 1;
bytes data = 2;
}
service Fetch {
rpc Capitalize(Payload) returns (Payload) {}
}
Now, run the following command in your terminal to create the gRPC stubs.
python3 -m grpc_tools.protoc \
-I./proto \
--python_out=. \
--grpc_python_out=. ./proto/defs.proto
This will create two new files. Your working directory will be:
./defs_pb.py
./defs_pb2_grpc.py
./capitalizeServer.py
./capitalizeClient.py
./proto/
./proto/defs.proto
Generate the Client
Copy and paste the following code inside of ./capitalizeClient.py
:
import grpc
import defs_pb2_grpc as proto
import defs_pb2 as pb
def main():
channel = grpc.insecure_channel('localhost:9778')
stub = proto.FetchStub(channel)
while True:
lineIn = input('> ')
capitalized = stub.Capitalize(pb.Payload(data=bytes(lineIn, encoding='utf-8')))
print('< %s\n'%(capitalized.data.decode('utf-8')))
if __name__ == '__main__':
main()
Generate the Service
Copy and paste the following code inside of ./capitalizeServer.py
:
import grpc
import time
from concurrent import futures
import defs_pb2_grpc as proto
import defs_pb2 as pb
class CapitalizeServer(proto.FetchServicer):
def __init__(self, *args, **kwargs):
super(CapitalizeServer, self).__init__()
def Capitalize(self, request, context):
return pb.Payload(data=request.data.upper())
def main():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
proto.add_FetchServicer_to_server(CapitalizeServer(), server)
server.add_insecure_port('[::]:9778')
server.start()
try:
while True:
time.sleep(60 * 60)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
main()
Run the Application
Let’s now run two of our python files at once. You may need to open a second terminal tab.
In one terminal tab, run python3 capitalizeServer.py
.
In the second terminal tab, run python3 capitalizeClient.py
.
Try typing in some text and hitting enter in the tab running capitalizeClient.py
. You should see something resembling the following:
Instrumentation
Tracing
Open ./capitalizeServer.py
.
First let’s import the required packages:
from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor
import grpc
import time
from concurrent import futures
from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor
import defs_pb2_grpc as proto
import defs_pb2 as pb
class CapitalizeServer(proto.FetchServicer):
def __init__(self, *args, **kwargs):
super(CapitalizeServer, self).__init__()
def Capitalize(self, request, context):
return pb.Payload(data=request.data.upper())
def main():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
proto.add_FetchServicer_to_server(CapitalizeServer(), server)
server.add_insecure_port('[::]:9778')
server.start()
try:
while True:
time.sleep(60 * 60)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
main()
Now let’s modify our Capitalize
function to create our span:
def Capitalize(self, request, context):
tracer = Tracer(sampler=always_on.AlwaysOnSampler())
with tracer.span(name='Capitalize') as span:
data = request.data
span.add_annotation('Data in', len=len(data))
return pb.Payload(data=data.upper())
import grpc
import time
from concurrent import futures
from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor
import defs_pb2_grpc as proto
import defs_pb2 as pb
class CapitalizeServer(proto.FetchServicer):
def __init__(self, *args, **kwargs):
super(CapitalizeServer, self).__init__()
def Capitalize(self, request, context):
tracer = Tracer(sampler=always_on.AlwaysOnSampler())
with tracer.span(name='Capitalize') as span:
data = request.data
span.add_annotation('Data in', len=len(data))
return pb.Payload(data=data.upper())
def main():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
proto.add_FetchServicer_to_server(CapitalizeServer(), server)
server.add_insecure_port('[::]:9778')
server.start()
try:
while True:
time.sleep(60 * 60)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
main()
Finally, let’s modify our main
function to setup the interceptor.
def main():
# Setup the gRPC integration/interceptor
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
always_on.AlwaysOnSampler())
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=(tracer_interceptor,))
proto.add_FetchServicer_to_server(CapitalizeServer(), server)
server.add_insecure_port('[::]:9778')
server.start()
import grpc
import time
from concurrent import futures
from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor
import defs_pb2_grpc as proto
import defs_pb2 as pb
class CapitalizeServer(proto.FetchServicer):
def __init__(self, *args, **kwargs):
super(CapitalizeServer, self).__init__()
def Capitalize(self, request, context):
tracer = Tracer(sampler=always_on.AlwaysOnSampler())
with tracer.span(name='Capitalize') as span:
data = request.data
span.add_annotation('Data in', len=len(data))
return pb.Payload(data=data.upper())
def main():
# Setup the gRPC integration/interceptor
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
always_on.AlwaysOnSampler())
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=(tracer_interceptor,))
proto.add_FetchServicer_to_server(CapitalizeServer(), server)
server.add_insecure_port('[::]:9778')
server.start()
try:
while True:
time.sleep(60 * 60)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
main()
Exporting
Import the required packages:
import os
from opencensus.common.transports.async_ import AsyncTransport
from opencensus.trace.exporters import stackdriver_exporter
import grpc
import os
import time
from concurrent import futures
from opencensus.common.transports.async_ import AsyncTransport
from opencensus.trace.exporters import stackdriver_exporter
from opencensus.trace.ext.grpc import server_interceptor
from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
import defs_pb2_grpc as proto
import defs_pb2 as pb
class CapitalizeServer(proto.FetchServicer):
def __init__(self, *args, **kwargs):
super(CapitalizeServer, self).__init__()
def Capitalize(self, request, context):
tracer = Tracer(sampler=always_on.AlwaysOnSampler())
with tracer.span(name='Capitalize') as span:
data = request.data
span.add_annotation('Data in', len=len(data))
return pb.Payload(data=data.upper())
def main():
# Setup the gRPC integration/interceptor
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
always_on.AlwaysOnSampler())
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=(tracer_interceptor,))
proto.add_FetchServicer_to_server(CapitalizeServer(), server)
server.add_insecure_port('[::]:9778')
server.start()
try:
while True:
time.sleep(60 * 60)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
main()
Setup the exporter:
# NOTE: Replace 'YOUR_GOOGLE_PROJECT_ID_HERE' with your actual Google Project ID!
exporter = stackdriver_exporter.StackdriverExporter(
project_id=os.environ.get('YOUR_GOOGLE_PROJECT_ID_HERE'),
transport=AsyncTransport)
import grpc
import os
import time
from concurrent import futures
from opencensus.common.transports.async_ import AsyncTransport
from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor
from opencensus.trace.exporters import stackdriver_exporter
import defs_pb2_grpc as proto
import defs_pb2 as pb
# NOTE: Replace 'YOUR_GOOGLE_PROJECT_ID_HERE' with your actual Google Project ID!
exporter = stackdriver_exporter.StackdriverExporter(
project_id=os.environ.get('YOUR_GOOGLE_PROJECT_ID_HERE'),
transport=AsyncTransport)
class CapitalizeServer(proto.FetchServicer):
def __init__(self, *args, **kwargs):
super(CapitalizeServer, self).__init__()
def Capitalize(self, request, context):
tracer = Tracer(sampler=always_on.AlwaysOnSampler())
with tracer.span(name='Capitalize') as span:
data = request.data
span.add_annotation('Data in', len=len(data))
return pb.Payload(data=data.upper())
def main():
# Setup the gRPC integration/interceptor
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
always_on.AlwaysOnSampler())
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=(tracer_interceptor,))
proto.add_FetchServicer_to_server(CapitalizeServer(), server)
server.add_insecure_port('[::]:9778')
server.start()
try:
while True:
time.sleep(60 * 60)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
main()
Implement the exporter:
def Capitalize(self, request, context):
tracer = Tracer(sampler=always_on.AlwaysOnSampler(), exporter=exporter)
def main():
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
always_on.AlwaysOnSampler())
import grpc
import os
import time
from concurrent import futures
from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor
from opencensus.common.transports.async_ import AsyncTransport
from opencensus.trace.exporters import stackdriver_exporter
import defs_pb2_grpc as proto
import defs_pb2 as pb
# NOTE: Replace 'YOUR_GOOGLE_PROJECT_ID_HERE' with your actual Google Project ID!
exporter = stackdriver_exporter.StackdriverExporter(
project_id=os.environ.get('YOUR_GOOGLE_PROJECT_ID_HERE'),
transport=AsyncTransport)
class CapitalizeServer(proto.FetchServicer):
def __init__(self, *args, **kwargs):
super(CapitalizeServer, self).__init__()
def Capitalize(self, request, context):
tracer = Tracer(sampler=always_on.AlwaysOnSampler(), exporter=exporter)
with tracer.span(name='Capitalize') as span:
data = request.data
span.add_annotation('Data in', len=len(data))
return pb.Payload(data=data.upper())
def main():
# Setup the gRPC integration/interceptor
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
always_on.AlwaysOnSampler(), exporter)
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=(tracer_interceptor,))
proto.add_FetchServicer_to_server(CapitalizeServer(), server)
server.add_insecure_port('[::]:9778')
server.start()
try:
while True:
time.sleep(60 * 60)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
main()
Examining Traces
Please visit https://console.cloud.google.com/traces/traces
which will give visuals such as: